]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/sftpd.py
f6e11cedf0022e64ba8f9c8120acac1e48782ddc
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / sftpd.py
1
2 import heapq, traceback, array, stat, struct
3 from types import NoneType
4 from stat import S_IFREG, S_IFDIR
5 from time import time, strftime, localtime
6
7 from zope.interface import implements
8 from twisted.python import components
9 from twisted.application import service, strports
10 from twisted.conch.ssh import factory, keys, session
11 from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
12      FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
13      FX_BAD_MESSAGE, FX_FAILURE, FX_OK
14 from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
15      FXF_CREAT, FXF_TRUNC, FXF_EXCL
16 from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser, ISession
17 from twisted.conch.avatar import ConchUser
18 from twisted.conch.openssh_compat import primes
19 from twisted.cred import portal
20 from twisted.internet.error import ProcessDone, ProcessTerminated
21 from twisted.python.failure import Failure
22 from twisted.internet.interfaces import ITransport
23
24 from twisted.internet import defer
25 from twisted.internet.interfaces import IConsumer
26 from foolscap.api import eventually
27 from allmydata.util import deferredutil
28
29 from allmydata.util.assertutil import _assert, precondition
30 from allmydata.util.consumer import download_to_data
31 from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
32      NoSuchChildError, ChildOfWrongTypeError
33 from allmydata.mutable.common import NotWriteableError
34 from allmydata.mutable.publish import MutableFileHandle
35 from allmydata.immutable.upload import FileHandle
36 from allmydata.dirnode import update_metadata
37 from allmydata.util.fileutil import EncryptedTemporaryFile
38
39 noisy = True
40 use_foolscap_logging = True
41
42 from allmydata.util.log import NOISY, OPERATIONAL, WEIRD, \
43     msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
44
45 if use_foolscap_logging:
46     (logmsg, logerr, PrefixingLogMixin) = (_msg, _err, _PrefixingLogMixin)
47 else:  # pragma: no cover
48     def logmsg(s, level=None):
49         print s
50     def logerr(s, level=None):
51         print s
52     class PrefixingLogMixin:
53         def __init__(self, facility=None, prefix=''):
54             self.prefix = prefix
55         def log(self, s, level=None):
56             print "%r %s" % (self.prefix, s)
57
58
59 def eventually_callback(d):
60     return lambda res: eventually(d.callback, res)
61
62 def eventually_errback(d):
63     return lambda err: eventually(d.errback, err)
64
65
66 def _utf8(x):
67     if isinstance(x, unicode):
68         return x.encode('utf-8')
69     if isinstance(x, str):
70         return x
71     return repr(x)
72
73
74 def _to_sftp_time(t):
75     """SFTP times are unsigned 32-bit integers representing UTC seconds
76     (ignoring leap seconds) since the Unix epoch, January 1 1970 00:00 UTC.
77     A Tahoe time is the corresponding float."""
78     return long(t) & 0xFFFFFFFFL
79
80
81 def _convert_error(res, request):
82     """If res is not a Failure, return it, otherwise reraise the appropriate
83     SFTPError."""
84
85     if not isinstance(res, Failure):
86         logged_res = res
87         if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
88         logmsg("SUCCESS %r %r" % (request, logged_res,), level=OPERATIONAL)
89         return res
90
91     err = res
92     logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
93     try:
94         if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
95     except Exception:  # pragma: no cover
96         pass
97
98     # The message argument to SFTPError must not reveal information that
99     # might compromise anonymity, if we are running over an anonymous network.
100
101     if err.check(SFTPError):
102         # original raiser of SFTPError has responsibility to ensure anonymity
103         raise err
104     if err.check(NoSuchChildError):
105         childname = _utf8(err.value.args[0])
106         raise SFTPError(FX_NO_SUCH_FILE, childname)
107     if err.check(NotWriteableError) or err.check(ChildOfWrongTypeError):
108         msg = _utf8(err.value.args[0])
109         raise SFTPError(FX_PERMISSION_DENIED, msg)
110     if err.check(ExistingChildError):
111         # Versions of SFTP after v3 (which is what twisted.conch implements)
112         # define a specific error code for this case: FX_FILE_ALREADY_EXISTS.
113         # However v3 doesn't; instead, other servers such as sshd return
114         # FX_FAILURE. The gvfs SFTP backend, for example, depends on this
115         # to translate the error to the equivalent of POSIX EEXIST, which is
116         # necessary for some picky programs (such as gedit).
117         msg = _utf8(err.value.args[0])
118         raise SFTPError(FX_FAILURE, msg)
119     if err.check(NotImplementedError):
120         raise SFTPError(FX_OP_UNSUPPORTED, _utf8(err.value))
121     if err.check(EOFError):
122         raise SFTPError(FX_EOF, "end of file reached")
123     if err.check(defer.FirstError):
124         _convert_error(err.value.subFailure, request)
125
126     # We assume that the error message is not anonymity-sensitive.
127     raise SFTPError(FX_FAILURE, _utf8(err.value))
128
129
130 def _repr_flags(flags):
131     return "|".join([f for f in
132                      [(flags & FXF_READ)   and "FXF_READ"   or None,
133                       (flags & FXF_WRITE)  and "FXF_WRITE"  or None,
134                       (flags & FXF_APPEND) and "FXF_APPEND" or None,
135                       (flags & FXF_CREAT)  and "FXF_CREAT"  or None,
136                       (flags & FXF_TRUNC)  and "FXF_TRUNC"  or None,
137                       (flags & FXF_EXCL)   and "FXF_EXCL"   or None,
138                      ]
139                      if f])
140
141
142 def _lsLine(name, attrs):
143     st_uid = "tahoe"
144     st_gid = "tahoe"
145     st_mtime = attrs.get("mtime", 0)
146     st_mode = attrs["permissions"]
147
148     # Some clients won't tolerate '?' in the size field (#1337).
149     st_size = attrs.get("size", 0)
150
151     # We don't know how many links there really are to this object.
152     st_nlink = 1
153
154     # Based on <https://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
155     # We previously could not call the version in Twisted because we needed the change
156     # <https://twistedmatrix.com/trac/changeset/25412> (released in Twisted v8.2).
157     # Since we now depend on Twisted v10.1, consider calling Twisted's version.
158
159     mode = st_mode
160     perms = array.array('c', '-'*10)
161     ft = stat.S_IFMT(mode)
162     if   stat.S_ISDIR(ft): perms[0] = 'd'
163     elif stat.S_ISREG(ft): perms[0] = '-'
164     else: perms[0] = '?'
165     # user
166     if mode&stat.S_IRUSR: perms[1] = 'r'
167     if mode&stat.S_IWUSR: perms[2] = 'w'
168     if mode&stat.S_IXUSR: perms[3] = 'x'
169     # group
170     if mode&stat.S_IRGRP: perms[4] = 'r'
171     if mode&stat.S_IWGRP: perms[5] = 'w'
172     if mode&stat.S_IXGRP: perms[6] = 'x'
173     # other
174     if mode&stat.S_IROTH: perms[7] = 'r'
175     if mode&stat.S_IWOTH: perms[8] = 'w'
176     if mode&stat.S_IXOTH: perms[9] = 'x'
177     # suid/sgid never set
178
179     l = perms.tostring()
180     l += str(st_nlink).rjust(5) + ' '
181     un = str(st_uid)
182     l += un.ljust(9)
183     gr = str(st_gid)
184     l += gr.ljust(9)
185     sz = str(st_size)
186     l += sz.rjust(8)
187     l += ' '
188     day = 60 * 60 * 24
189     sixmo = day * 7 * 26
190     now = time()
191     if st_mtime + sixmo < now or st_mtime > now + day:
192         # mtime is more than 6 months ago, or more than one day in the future
193         l += strftime("%b %d  %Y ", localtime(st_mtime))
194     else:
195         l += strftime("%b %d %H:%M ", localtime(st_mtime))
196     l += name
197     return l
198
199
200 def _no_write(parent_readonly, child, metadata=None):
201     """Whether child should be listed as having read-only permissions in parent."""
202
203     if child.is_unknown():
204         return True
205     elif child.is_mutable():
206         return child.is_readonly()
207     elif parent_readonly or IDirectoryNode.providedBy(child):
208         return True
209     else:
210         return metadata is not None and metadata.get('no-write', False)
211
212
213 def _populate_attrs(childnode, metadata, size=None):
214     attrs = {}
215
216     # The permissions must have the S_IFDIR (040000) or S_IFREG (0100000)
217     # bits, otherwise the client may refuse to open a directory.
218     # Also, sshfs run as a non-root user requires files and directories
219     # to be world-readable/writeable.
220     # It is important that we never set the executable bits on files.
221     #
222     # Directories and unknown nodes have no size, and SFTP doesn't
223     # require us to make one up.
224     #
225     # childnode might be None, meaning that the file doesn't exist yet,
226     # but we're going to write it later.
227
228     if childnode and childnode.is_unknown():
229         perms = 0
230     elif childnode and IDirectoryNode.providedBy(childnode):
231         perms = S_IFDIR | 0777
232     else:
233         # For files, omit the size if we don't immediately know it.
234         if childnode and size is None:
235             size = childnode.get_size()
236         if size is not None:
237             _assert(isinstance(size, (int, long)) and not isinstance(size, bool), size=size)
238             attrs['size'] = size
239         perms = S_IFREG | 0666
240
241     if metadata:
242         if metadata.get('no-write', False):
243             perms &= S_IFDIR | S_IFREG | 0555  # clear 'w' bits
244
245         # See webapi.txt for what these times mean.
246         # We would prefer to omit atime, but SFTP version 3 can only
247         # accept mtime if atime is also set.
248         if 'linkmotime' in metadata.get('tahoe', {}):
249             attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
250         elif 'mtime' in metadata:
251             attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['mtime'])
252
253         if 'linkcrtime' in metadata.get('tahoe', {}):
254             attrs['createtime'] = _to_sftp_time(metadata['tahoe']['linkcrtime'])
255
256     attrs['permissions'] = perms
257
258     # twisted.conch.ssh.filetransfer only implements SFTP version 3,
259     # which doesn't include SSH_FILEXFER_ATTR_FLAGS.
260
261     return attrs
262
263
264 def _attrs_to_metadata(attrs):
265     metadata = {}
266
267     for key in attrs:
268         if key == "mtime" or key == "ctime" or key == "createtime":
269             metadata[key] = long(attrs[key])
270         elif key.startswith("ext_"):
271             metadata[key] = str(attrs[key])
272
273     perms = attrs.get('permissions', stat.S_IWUSR)
274     if not (perms & stat.S_IWUSR):
275         metadata['no-write'] = True
276
277     return metadata
278
279
280 def _direntry_for(filenode_or_parent, childname, filenode=None):
281     precondition(isinstance(childname, (unicode, NoneType)), childname=childname)
282
283     if childname is None:
284         filenode_or_parent = filenode
285
286     if filenode_or_parent:
287         rw_uri = filenode_or_parent.get_write_uri()
288         if rw_uri and childname:
289             return rw_uri + "/" + childname.encode('utf-8')
290         else:
291             return rw_uri
292
293     return None
294
295
296 class OverwriteableFileConsumer(PrefixingLogMixin):
297     implements(IConsumer)
298     """I act both as a consumer for the download of the original file contents, and as a
299     wrapper for a temporary file that records the downloaded data and any overwrites.
300     I use a priority queue to keep track of which regions of the file have been overwritten
301     but not yet downloaded, so that the download does not clobber overwritten data.
302     I use another priority queue to record milestones at which to make callbacks
303     indicating that a given number of bytes have been downloaded.
304
305     The temporary file reflects the contents of the file that I represent, except that:
306      - regions that have neither been downloaded nor overwritten, if present,
307        contain garbage.
308      - the temporary file may be shorter than the represented file (it is never longer).
309        The latter's current size is stored in self.current_size.
310
311     This abstraction is mostly independent of SFTP. Consider moving it, if it is found
312     useful for other frontends."""
313
314     def __init__(self, download_size, tempfile_maker):
315         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
316         if noisy: self.log(".__init__(%r, %r)" % (download_size, tempfile_maker), level=NOISY)
317         self.download_size = download_size
318         self.current_size = download_size
319         self.f = tempfile_maker()
320         self.downloaded = 0
321         self.milestones = []  # empty heap of (offset, d)
322         self.overwrites = []  # empty heap of (start, end)
323         self.is_closed = False
324
325         self.done = defer.Deferred()
326         self.done_status = None  # None -> not complete, Failure -> download failed, str -> download succeeded
327         self.producer = None
328
329     def get_file(self):
330         return self.f
331
332     def get_current_size(self):
333         return self.current_size
334
335     def set_current_size(self, size):
336         if noisy: self.log(".set_current_size(%r), current_size = %r, downloaded = %r" %
337                            (size, self.current_size, self.downloaded), level=NOISY)
338         if size < self.current_size or size < self.downloaded:
339             self.f.truncate(size)
340         if size > self.current_size:
341             self.overwrite(self.current_size, "\x00" * (size - self.current_size))
342         self.current_size = size
343
344         # make the invariant self.download_size <= self.current_size be true again
345         if size < self.download_size:
346             self.download_size = size
347
348         if self.downloaded >= self.download_size:
349             self.download_done("size changed")
350
351     def registerProducer(self, p, streaming):
352         if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
353         if self.producer is not None:
354             raise RuntimeError("producer is already registered")
355
356         self.producer = p
357         if streaming:
358             # call resumeProducing once to start things off
359             p.resumeProducing()
360         else:
361             def _iterate():
362                 if self.done_status is None:
363                     p.resumeProducing()
364                     eventually(_iterate)
365             _iterate()
366
367     def write(self, data):
368         if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
369         if self.is_closed:
370             return
371
372         if self.downloaded >= self.download_size:
373             return
374
375         next_downloaded = self.downloaded + len(data)
376         if next_downloaded > self.download_size:
377             data = data[:(self.download_size - self.downloaded)]
378
379         while len(self.overwrites) > 0:
380             (start, end) = self.overwrites[0]
381             if start >= next_downloaded:
382                 # This and all remaining overwrites are after the data we just downloaded.
383                 break
384             if start > self.downloaded:
385                 # The data we just downloaded has been partially overwritten.
386                 # Write the prefix of it that precedes the overwritten region.
387                 self.f.seek(self.downloaded)
388                 self.f.write(data[:(start - self.downloaded)])
389
390             # This merges consecutive overwrites if possible, which allows us to detect the
391             # case where the download can be stopped early because the remaining region
392             # to download has already been fully overwritten.
393             heapq.heappop(self.overwrites)
394             while len(self.overwrites) > 0:
395                 (start1, end1) = self.overwrites[0]
396                 if start1 > end:
397                     break
398                 end = end1
399                 heapq.heappop(self.overwrites)
400
401             if end >= next_downloaded:
402                 # This overwrite extends past the downloaded data, so there is no
403                 # more data to consider on this call.
404                 heapq.heappush(self.overwrites, (next_downloaded, end))
405                 self._update_downloaded(next_downloaded)
406                 return
407             elif end >= self.downloaded:
408                 data = data[(end - self.downloaded):]
409                 self._update_downloaded(end)
410
411         self.f.seek(self.downloaded)
412         self.f.write(data)
413         self._update_downloaded(next_downloaded)
414
415     def _update_downloaded(self, new_downloaded):
416         self.downloaded = new_downloaded
417         milestone = new_downloaded
418         if len(self.overwrites) > 0:
419             (start, end) = self.overwrites[0]
420             if start <= new_downloaded and end > milestone:
421                 milestone = end
422
423         while len(self.milestones) > 0:
424             (next, d) = self.milestones[0]
425             if next > milestone:
426                 return
427             if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
428             heapq.heappop(self.milestones)
429             eventually_callback(d)("reached")
430
431         if milestone >= self.download_size:
432             self.download_done("reached download size")
433
434     def overwrite(self, offset, data):
435         if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
436         if self.is_closed:
437             self.log("overwrite called on a closed OverwriteableFileConsumer", level=WEIRD)
438             raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
439
440         if offset > self.current_size:
441             # Normally writing at an offset beyond the current end-of-file
442             # would leave a hole that appears filled with zeroes. However, an
443             # EncryptedTemporaryFile doesn't behave like that (if there is a
444             # hole in the file on disk, the zeroes that are read back will be
445             # XORed with the keystream). So we must explicitly write zeroes in
446             # the gap between the current EOF and the offset.
447
448             self.f.seek(self.current_size)
449             self.f.write("\x00" * (offset - self.current_size))
450             start = self.current_size
451         else:
452             self.f.seek(offset)
453             start = offset
454
455         self.f.write(data)
456         end = offset + len(data)
457         self.current_size = max(self.current_size, end)
458         if end > self.downloaded:
459             heapq.heappush(self.overwrites, (start, end))
460
461     def read(self, offset, length):
462         """When the data has been read, callback the Deferred that we return with this data.
463         Otherwise errback the Deferred that we return.
464         The caller must perform no more overwrites until the Deferred has fired."""
465
466         if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
467         if self.is_closed:
468             self.log("read called on a closed OverwriteableFileConsumer", level=WEIRD)
469             raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
470
471         # Note that the overwrite method is synchronous. When a write request is processed
472         # (e.g. a writeChunk request on the async queue of GeneralSFTPFile), overwrite will
473         # be called and will update self.current_size if necessary before returning. Therefore,
474         # self.current_size will be up-to-date for a subsequent call to this read method, and
475         # so it is correct to do the check for a read past the end-of-file here.
476         if offset >= self.current_size:
477             def _eof(): raise EOFError("read past end of file")
478             return defer.execute(_eof)
479
480         if offset + length > self.current_size:
481             length = self.current_size - offset
482             if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
483
484         needed = min(offset + length, self.download_size)
485
486         # If we fail to reach the needed number of bytes, the read request will fail.
487         d = self.when_reached_or_failed(needed)
488         def _reached_in_read(res):
489             # It is not necessarily the case that self.downloaded >= needed, because
490             # the file might have been truncated (thus truncating the download) and
491             # then extended.
492
493             _assert(self.current_size >= offset + length,
494                     current_size=self.current_size, offset=offset, length=length)
495             if noisy: self.log("_reached_in_read(%r), self.f = %r" % (res, self.f,), level=NOISY)
496             self.f.seek(offset)
497             return self.f.read(length)
498         d.addCallback(_reached_in_read)
499         return d
500
501     def when_reached_or_failed(self, index):
502         if noisy: self.log(".when_reached_or_failed(%r)" % (index,), level=NOISY)
503         def _reached(res):
504             if noisy: self.log("reached %r with result %r" % (index, res), level=NOISY)
505             return res
506
507         if self.done_status is not None:
508             return defer.execute(_reached, self.done_status)
509         if index <= self.downloaded:  # already reached successfully
510             if noisy: self.log("already reached %r successfully" % (index,), level=NOISY)
511             return defer.succeed("already reached successfully")
512         d = defer.Deferred()
513         d.addCallback(_reached)
514         heapq.heappush(self.milestones, (index, d))
515         return d
516
517     def when_done(self):
518         d = defer.Deferred()
519         self.done.addCallback(lambda ign: eventually_callback(d)(self.done_status))
520         return d
521
522     def download_done(self, res):
523         _assert(isinstance(res, (str, Failure)), res=res)
524         # Only the first call to download_done counts, but we log subsequent calls
525         # (multiple calls are normal).
526         if self.done_status is not None:
527             self.log("IGNORING extra call to download_done with result %r; previous result was %r"
528                      % (res, self.done_status), level=OPERATIONAL)
529             return
530
531         self.log("DONE with result %r" % (res,), level=OPERATIONAL)
532
533         # We avoid errbacking self.done so that we are not left with an 'Unhandled error in Deferred'
534         # in case when_done() is never called. Instead we stash the failure in self.done_status,
535         # from where the callback added in when_done() can retrieve it.
536         self.done_status = res
537         eventually_callback(self.done)(None)
538
539         while len(self.milestones) > 0:
540             (next, d) = self.milestones[0]
541             if noisy: self.log("MILESTONE FINISH %r %r %r" % (next, d, res), level=NOISY)
542             heapq.heappop(self.milestones)
543             # The callback means that the milestone has been reached if
544             # it is ever going to be. Note that the file may have been
545             # truncated to before the milestone.
546             eventually_callback(d)(res)
547
548     def close(self):
549         if not self.is_closed:
550             self.is_closed = True
551             try:
552                 self.f.close()
553             except Exception, e:
554                 self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
555         self.download_done("closed")
556         return self.done_status
557
558     def unregisterProducer(self):
559         # This will happen just before our client calls download_done, which will tell
560         # us the outcome of the download; we don't know the outcome at this point.
561         self.producer = None
562         self.log("producer unregistered", level=NOISY)
563
564
565 SIZE_THRESHOLD = 1000
566
567
568 class ShortReadOnlySFTPFile(PrefixingLogMixin):
569     implements(ISFTPFile)
570     """I represent a file handle to a particular file on an SFTP connection.
571     I am used only for short immutable files opened in read-only mode.
572     When I am created, the file contents start to be downloaded to memory.
573     self.async is used to delay read requests until the download has finished."""
574
575     def __init__(self, userpath, filenode, metadata):
576         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
577         if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
578
579         precondition(isinstance(userpath, str) and IFileNode.providedBy(filenode),
580                      userpath=userpath, filenode=filenode)
581         self.filenode = filenode
582         self.metadata = metadata
583         self.async = download_to_data(filenode)
584         self.closed = False
585
586     def readChunk(self, offset, length):
587         request = ".readChunk(%r, %r)" % (offset, length)
588         self.log(request, level=OPERATIONAL)
589
590         if self.closed:
591             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
592             return defer.execute(_closed)
593
594         d = defer.Deferred()
595         def _read(data):
596             if noisy: self.log("_read(<data of length %r>) in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY)
597
598             # "In response to this request, the server will read as many bytes as it
599             #  can from the file (up to 'len'), and return them in a SSH_FXP_DATA
600             #  message.  If an error occurs or EOF is encountered before reading any
601             #  data, the server will respond with SSH_FXP_STATUS.  For normal disk
602             #  files, it is guaranteed that this will read the specified number of
603             #  bytes, or up to end of file."
604             #
605             # i.e. we respond with an EOF error iff offset is already at EOF.
606
607             if offset >= len(data):
608                 eventually_errback(d)(Failure(SFTPError(FX_EOF, "read at or past end of file")))
609             else:
610                 eventually_callback(d)(data[offset:offset+length])  # truncated if offset+length > len(data)
611             return data
612         self.async.addCallbacks(_read, eventually_errback(d))
613         d.addBoth(_convert_error, request)
614         return d
615
616     def writeChunk(self, offset, data):
617         self.log(".writeChunk(%r, <data of length %r>) denied" % (offset, len(data)), level=OPERATIONAL)
618
619         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
620         return defer.execute(_denied)
621
622     def close(self):
623         self.log(".close()", level=OPERATIONAL)
624
625         self.closed = True
626         return defer.succeed(None)
627
628     def getAttrs(self):
629         request = ".getAttrs()"
630         self.log(request, level=OPERATIONAL)
631
632         if self.closed:
633             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
634             return defer.execute(_closed)
635
636         d = defer.execute(_populate_attrs, self.filenode, self.metadata)
637         d.addBoth(_convert_error, request)
638         return d
639
640     def setAttrs(self, attrs):
641         self.log(".setAttrs(%r) denied" % (attrs,), level=OPERATIONAL)
642         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
643         return defer.execute(_denied)
644
645
646 class GeneralSFTPFile(PrefixingLogMixin):
647     implements(ISFTPFile)
648     """I represent a file handle to a particular file on an SFTP connection.
649     I wrap an instance of OverwriteableFileConsumer, which is responsible for
650     storing the file contents. In order to allow write requests to be satisfied
651     immediately, there is effectively a FIFO queue between requests made to this
652     file handle, and requests to my OverwriteableFileConsumer. This queue is
653     implemented by the callback chain of self.async.
654
655     When first constructed, I am in an 'unopened' state that causes most
656     operations to be delayed until 'open' is called."""
657
658     def __init__(self, userpath, flags, close_notify, convergence):
659         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
660         if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
661                            (userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
662
663         precondition(isinstance(userpath, str), userpath=userpath)
664         self.userpath = userpath
665         self.flags = flags
666         self.close_notify = close_notify
667         self.convergence = convergence
668         self.async = defer.Deferred()
669         # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
670         self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
671         self.closed = False
672         self.abandoned = False
673         self.parent = None
674         self.childname = None
675         self.filenode = None
676         self.metadata = None
677
678         # self.consumer should only be relied on in callbacks for self.async, since it might
679         # not be set before then.
680         self.consumer = None
681
682     def open(self, parent=None, childname=None, filenode=None, metadata=None):
683         self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
684                  (parent, childname, filenode, metadata), level=OPERATIONAL)
685
686         precondition(isinstance(childname, (unicode, NoneType)), childname=childname)
687         precondition(filenode is None or IFileNode.providedBy(filenode), filenode=filenode)
688         precondition(not self.closed, sftpfile=self)
689
690         # If the file has been renamed, the new (parent, childname) takes precedence.
691         if self.parent is None:
692             self.parent = parent
693         if self.childname is None:
694             self.childname = childname
695         self.filenode = filenode
696         self.metadata = metadata
697
698         tempfile_maker = EncryptedTemporaryFile
699
700         if (self.flags & FXF_TRUNC) or not filenode:
701             # We're either truncating or creating the file, so we don't need the old contents.
702             self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
703             self.consumer.download_done("download not needed")
704         else:
705             self.async.addCallback(lambda ignored: filenode.get_best_readable_version())
706
707             def _read(version):
708                 if noisy: self.log("_read", level=NOISY)
709                 download_size = version.get_size()
710                 _assert(download_size is not None)
711
712                 self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
713
714                 d = version.read(self.consumer, 0, None)
715                 def _finished(res):
716                     if not isinstance(res, Failure):
717                         res = "download finished"
718                     self.consumer.download_done(res)
719                 d.addBoth(_finished)
720                 # It is correct to drop d here.
721             self.async.addCallback(_read)
722
723         eventually_callback(self.async)(None)
724
725         if noisy: self.log("open done", level=NOISY)
726         return self
727
728     def get_userpath(self):
729         return self.userpath
730
731     def get_direntry(self):
732         return _direntry_for(self.parent, self.childname)
733
734     def rename(self, new_userpath, new_parent, new_childname):
735         self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
736
737         precondition(isinstance(new_userpath, str) and isinstance(new_childname, unicode),
738                      new_userpath=new_userpath, new_childname=new_childname)
739         self.userpath = new_userpath
740         self.parent = new_parent
741         self.childname = new_childname
742
743     def abandon(self):
744         self.log(".abandon()", level=OPERATIONAL)
745
746         self.abandoned = True
747
748     def sync(self, ign=None):
749         # The ign argument allows some_file.sync to be used as a callback.
750         self.log(".sync()", level=OPERATIONAL)
751
752         d = defer.Deferred()
753         self.async.addBoth(eventually_callback(d))
754         def _done(res):
755             if noisy: self.log("_done(%r) in .sync()" % (res,), level=NOISY)
756             return res
757         d.addBoth(_done)
758         return d
759
760     def readChunk(self, offset, length):
761         request = ".readChunk(%r, %r)" % (offset, length)
762         self.log(request, level=OPERATIONAL)
763
764         if not (self.flags & FXF_READ):
765             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for reading")
766             return defer.execute(_denied)
767
768         if self.closed:
769             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
770             return defer.execute(_closed)
771
772         d = defer.Deferred()
773         def _read(ign):
774             if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
775             d2 = self.consumer.read(offset, length)
776             d2.addBoth(eventually_callback(d))
777             # It is correct to drop d2 here.
778             return None
779         self.async.addCallbacks(_read, eventually_errback(d))
780         d.addBoth(_convert_error, request)
781         return d
782
783     def writeChunk(self, offset, data):
784         self.log(".writeChunk(%r, <data of length %r>)" % (offset, len(data)), level=OPERATIONAL)
785
786         if not (self.flags & FXF_WRITE):
787             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
788             return defer.execute(_denied)
789
790         if self.closed:
791             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
792             return defer.execute(_closed)
793
794         self.has_changed = True
795
796         # Note that we return without waiting for the write to occur. Reads and
797         # close wait for prior writes, and will fail if any prior operation failed.
798         # This is ok because SFTP makes no guarantee that the write completes
799         # before the request does. In fact it explicitly allows write errors to be
800         # delayed until close:
801         #   "One should note that on some server platforms even a close can fail.
802         #    This can happen e.g. if the server operating system caches writes,
803         #    and an error occurs while flushing cached writes during the close."
804
805         def _write(ign):
806             if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
807                                (offset, len(data), self.consumer.get_current_size()), level=NOISY)
808             # FXF_APPEND means that we should always write at the current end of file.
809             write_offset = offset
810             if self.flags & FXF_APPEND:
811                 write_offset = self.consumer.get_current_size()
812
813             self.consumer.overwrite(write_offset, data)
814             if noisy: self.log("overwrite done", level=NOISY)
815             return None
816         self.async.addCallback(_write)
817         # don't addErrback to self.async, just allow subsequent async ops to fail.
818         return defer.succeed(None)
819
820     def _do_close(self, res, d=None):
821         if noisy: self.log("_do_close(%r)" % (res,), level=NOISY)
822         status = None
823         if self.consumer:
824             status = self.consumer.close()
825
826         # We must close_notify before re-firing self.async.
827         if self.close_notify:
828             self.close_notify(self.userpath, self.parent, self.childname, self)
829
830         if not isinstance(res, Failure) and isinstance(status, Failure):
831             res = status
832
833         if d:
834             eventually_callback(d)(res)
835         elif isinstance(res, Failure):
836             self.log("suppressing %r" % (res,), level=OPERATIONAL)
837
838     def close(self):
839         request = ".close()"
840         self.log(request, level=OPERATIONAL)
841
842         if self.closed:
843             return defer.succeed(None)
844
845         # This means that close has been called, not that the close has succeeded.
846         self.closed = True
847
848         if not (self.flags & (FXF_WRITE | FXF_CREAT)):
849             # We never fail a close of a handle opened only for reading, even if the file
850             # failed to download. (We could not do so deterministically, because it would
851             # depend on whether we reached the point of failure before abandoning the
852             # download.) Any reads that depended on file content that could not be downloaded
853             # will have failed. It is important that we don't close the consumer until
854             # previous read operations have completed.
855             self.async.addBoth(self._do_close)
856             return defer.succeed(None)
857
858         # We must capture the abandoned, parent, and childname variables synchronously
859         # at the close call. This is needed by the correctness arguments in the comments
860         # for _abandon_any_heisenfiles and _rename_heisenfiles.
861         # Note that the file must have been opened before it can be closed.
862         abandoned = self.abandoned
863         parent = self.parent
864         childname = self.childname
865
866         # has_changed is set when writeChunk is called, not when the write occurs, so
867         # it is correct to optimize out the commit if it is False at the close call.
868         has_changed = self.has_changed
869
870         def _commit(ign):
871             d2 = self.consumer.when_done()
872             if self.filenode and self.filenode.is_mutable():
873                 self.log("update mutable file %r childname=%r metadata=%r"
874                          % (self.filenode, childname, self.metadata), level=OPERATIONAL)
875                 if self.metadata.get('no-write', False) and not self.filenode.is_readonly():
876                     _assert(parent and childname, parent=parent, childname=childname, metadata=self.metadata)
877                     d2.addCallback(lambda ign: parent.set_metadata_for(childname, self.metadata))
878
879                 d2.addCallback(lambda ign: self.filenode.overwrite(MutableFileHandle(self.consumer.get_file())))
880             else:
881                 def _add_file(ign):
882                     self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
883                     u = FileHandle(self.consumer.get_file(), self.convergence)
884                     return parent.add_file(childname, u, metadata=self.metadata)
885                 d2.addCallback(_add_file)
886             return d2
887
888         # If the file has been abandoned, we don't want the close operation to get "stuck",
889         # even if self.async fails to re-fire. Completing the close independently of self.async
890         # in that case should ensure that dropping an ssh connection is sufficient to abandon
891         # any heisenfiles that were not explicitly closed in that connection.
892         if abandoned or not has_changed:
893             d = defer.succeed(None)
894             self.async.addBoth(self._do_close)
895         else:
896             d = defer.Deferred()
897             self.async.addCallback(_commit)
898             self.async.addBoth(self._do_close, d)
899         d.addBoth(_convert_error, request)
900         return d
901
902     def getAttrs(self):
903         request = ".getAttrs()"
904         self.log(request, level=OPERATIONAL)
905
906         if self.closed:
907             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
908             return defer.execute(_closed)
909
910         # Optimization for read-only handles, when we already know the metadata.
911         if not (self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
912             return defer.succeed(_populate_attrs(self.filenode, self.metadata))
913
914         d = defer.Deferred()
915         def _get(ign):
916             if noisy: self.log("_get(%r) in %r, filenode = %r, metadata = %r" % (ign, request, self.filenode, self.metadata), level=NOISY)
917
918             # self.filenode might be None, but that's ok.
919             attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
920             eventually_callback(d)(attrs)
921             return None
922         self.async.addCallbacks(_get, eventually_errback(d))
923         d.addBoth(_convert_error, request)
924         return d
925
926     def setAttrs(self, attrs, only_if_at=None):
927         request = ".setAttrs(%r, only_if_at=%r)" % (attrs, only_if_at)
928         self.log(request, level=OPERATIONAL)
929
930         if not (self.flags & FXF_WRITE):
931             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
932             return defer.execute(_denied)
933
934         if self.closed:
935             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
936             return defer.execute(_closed)
937
938         size = attrs.get("size", None)
939         if size is not None and (not isinstance(size, (int, long)) or size < 0):
940             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
941             return defer.execute(_bad)
942
943         d = defer.Deferred()
944         def _set(ign):
945             if noisy: self.log("_set(%r) in %r" % (ign, request), level=NOISY)
946             current_direntry = _direntry_for(self.parent, self.childname, self.filenode)
947             if only_if_at and only_if_at != current_direntry:
948                 if noisy: self.log("not setting attributes: current_direntry=%r in %r" %
949                                    (current_direntry, request), level=NOISY)
950                 return None
951
952             now = time()
953             self.metadata = update_metadata(self.metadata, _attrs_to_metadata(attrs), now)
954             if size is not None:
955                 # TODO: should we refuse to truncate a file opened with FXF_APPEND?
956                 # <http://allmydata.org/trac/tahoe-lafs/ticket/1037#comment:20>
957                 self.consumer.set_current_size(size)
958             eventually_callback(d)(None)
959             return None
960         self.async.addCallbacks(_set, eventually_errback(d))
961         d.addBoth(_convert_error, request)
962         return d
963
964
965 class StoppableList:
966     def __init__(self, items):
967         self.items = items
968     def __iter__(self):
969         for i in self.items:
970             yield i
971     def close(self):
972         pass
973
974
975 class Reason:
976     def __init__(self, value):
977         self.value = value
978
979
980 # A "heisenfile" is a file that has been opened with write flags
981 # (FXF_WRITE and/or FXF_CREAT) and not yet close-notified.
982 # 'all_heisenfiles' maps from a direntry string to a list of
983 # GeneralSFTPFile.
984 #
985 # A direntry string is parent_write_uri + "/" + childname_utf8 for
986 # an immutable file, or file_write_uri for a mutable file.
987 # Updates to this dict are single-threaded.
988
989 all_heisenfiles = {}
990
991 def _reload():
992     global all_heisenfiles
993     all_heisenfiles = {}
994
995 class SFTPUserHandler(ConchUser, PrefixingLogMixin):
996     implements(ISFTPServer)
997     def __init__(self, client, rootnode, username):
998         ConchUser.__init__(self)
999         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=username)
1000         if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
1001
1002         self.channelLookup["session"] = session.SSHSession
1003         self.subsystemLookup["sftp"] = FileTransferServer
1004
1005         self._client = client
1006         self._root = rootnode
1007         self._username = username
1008         self._convergence = client.convergence
1009
1010         # maps from UTF-8 paths for this user, to files written and still open
1011         self._heisenfiles = {}
1012
1013     def gotVersion(self, otherVersion, extData):
1014         self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
1015
1016         # advertise the same extensions as the OpenSSH SFTP server
1017         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1018         return {'posix-rename@openssh.com': '1',
1019                 'statvfs@openssh.com': '2',
1020                 'fstatvfs@openssh.com': '2',
1021                }
1022
1023     def logout(self):
1024         self.log(".logout()", level=OPERATIONAL)
1025
1026         for files in self._heisenfiles.itervalues():
1027             for f in files:
1028                 f.abandon()
1029
1030     def _add_heisenfile_by_path(self, file):
1031         self.log("._add_heisenfile_by_path(%r)" % (file,), level=OPERATIONAL)
1032
1033         userpath = file.get_userpath()
1034         if userpath in self._heisenfiles:
1035             self._heisenfiles[userpath] += [file]
1036         else:
1037             self._heisenfiles[userpath] = [file]
1038
1039     def _add_heisenfile_by_direntry(self, file):
1040         self.log("._add_heisenfile_by_direntry(%r)" % (file,), level=OPERATIONAL)
1041
1042         direntry = file.get_direntry()
1043         if direntry:
1044             if direntry in all_heisenfiles:
1045                 all_heisenfiles[direntry] += [file]
1046             else:
1047                 all_heisenfiles[direntry] = [file]
1048
1049     def _abandon_any_heisenfiles(self, userpath, direntry):
1050         request = "._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry)
1051         self.log(request, level=OPERATIONAL)
1052
1053         precondition(isinstance(userpath, str), userpath=userpath)
1054
1055         # First we synchronously mark all heisenfiles matching the userpath or direntry
1056         # as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
1057         # each file that we abandoned.
1058         #
1059         # For each file, the call to .abandon() occurs:
1060         #   * before the file is closed, in which case it will never be committed
1061         #     (uploaded+linked or published); or
1062         #   * after it is closed but before it has been close_notified, in which case the
1063         #     .sync() ensures that it has been committed (successfully or not) before we
1064         #     return.
1065         #
1066         # This avoids a race that might otherwise cause the file to be committed after
1067         # the remove operation has completed.
1068         #
1069         # We return a Deferred that fires with True if any files were abandoned (this
1070         # does not mean that they were not committed; it is used to determine whether
1071         # a NoSuchChildError from the attempt to delete the file should be suppressed).
1072
1073         files = []
1074         if direntry in all_heisenfiles:
1075             files = all_heisenfiles[direntry]
1076             del all_heisenfiles[direntry]
1077         if userpath in self._heisenfiles:
1078             files += self._heisenfiles[userpath]
1079             del self._heisenfiles[userpath]
1080
1081         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1082
1083         for f in files:
1084             f.abandon()
1085
1086         d = defer.succeed(None)
1087         for f in files:
1088             d.addBoth(f.sync)
1089
1090         def _done(ign):
1091             self.log("done %r" % (request,), level=OPERATIONAL)
1092             return len(files) > 0
1093         d.addBoth(_done)
1094         return d
1095
1096     def _rename_heisenfiles(self, from_userpath, from_parent, from_childname,
1097                             to_userpath, to_parent, to_childname, overwrite=True):
1098         request = ("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" %
1099                    (from_userpath, from_parent, from_childname, to_userpath, to_parent, to_childname, overwrite))
1100         self.log(request, level=OPERATIONAL)
1101
1102         precondition((isinstance(from_userpath, str) and isinstance(from_childname, unicode) and
1103                       isinstance(to_userpath, str) and isinstance(to_childname, unicode)),
1104                      from_userpath=from_userpath, from_childname=from_childname, to_userpath=to_userpath, to_childname=to_childname)
1105
1106         if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1107
1108         # First we synchronously rename all heisenfiles matching the userpath or direntry.
1109         # Then we .sync() each file that we renamed.
1110         #
1111         # For each file, the call to .rename occurs:
1112         #   * before the file is closed, in which case it will be committed at the
1113         #     new direntry; or
1114         #   * after it is closed but before it has been close_notified, in which case the
1115         #     .sync() ensures that it has been committed (successfully or not) before we
1116         #     return.
1117         #
1118         # This avoids a race that might otherwise cause the file to be committed at the
1119         # old name after the rename operation has completed.
1120         #
1121         # Note that if overwrite is False, the caller should already have checked
1122         # whether a real direntry exists at the destination. It is possible that another
1123         # direntry (heisen or real) comes to exist at the destination after that check,
1124         # but in that case it is correct for the rename to succeed (and for the commit
1125         # of the heisenfile at the destination to possibly clobber the other entry, since
1126         # that can happen anyway when we have concurrent write handles to the same direntry).
1127         #
1128         # We return a Deferred that fires with True if any files were renamed (this
1129         # does not mean that they were not committed; it is used to determine whether
1130         # a NoSuchChildError from the rename attempt should be suppressed). If overwrite
1131         # is False and there were already heisenfiles at the destination userpath or
1132         # direntry, we return a Deferred that fails with SFTPError(FX_PERMISSION_DENIED).
1133
1134         from_direntry = _direntry_for(from_parent, from_childname)
1135         to_direntry = _direntry_for(to_parent, to_childname)
1136
1137         if noisy: self.log("from_direntry = %r, to_direntry = %r, len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
1138                            (from_direntry, to_direntry, len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
1139
1140         if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
1141             def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1142             if noisy: self.log("existing", level=NOISY)
1143             return defer.execute(_existing)
1144
1145         from_files = []
1146         if from_direntry in all_heisenfiles:
1147             from_files = all_heisenfiles[from_direntry]
1148             del all_heisenfiles[from_direntry]
1149         if from_userpath in self._heisenfiles:
1150             from_files += self._heisenfiles[from_userpath]
1151             del self._heisenfiles[from_userpath]
1152
1153         if noisy: self.log("from_files = %r in %r" % (from_files, request), level=NOISY)
1154
1155         for f in from_files:
1156             f.rename(to_userpath, to_parent, to_childname)
1157             self._add_heisenfile_by_path(f)
1158             self._add_heisenfile_by_direntry(f)
1159
1160         d = defer.succeed(None)
1161         for f in from_files:
1162             d.addBoth(f.sync)
1163
1164         def _done(ign):
1165             if noisy: self.log("done: len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
1166                                (len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
1167             return len(from_files) > 0
1168         d.addBoth(_done)
1169         return d
1170
1171     def _update_attrs_for_heisenfiles(self, userpath, direntry, attrs):
1172         request = "._update_attrs_for_heisenfiles(%r, %r, %r)" % (userpath, direntry, attrs)
1173         self.log(request, level=OPERATIONAL)
1174
1175         _assert(isinstance(userpath, str) and isinstance(direntry, str),
1176                 userpath=userpath, direntry=direntry)
1177
1178         files = []
1179         if direntry in all_heisenfiles:
1180             files = all_heisenfiles[direntry]
1181         if userpath in self._heisenfiles:
1182             files += self._heisenfiles[userpath]
1183
1184         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1185
1186         # We set the metadata for all heisenfiles at this path or direntry.
1187         # Since a direntry includes a write URI, we must have authority to
1188         # change the metadata of heisenfiles found in the all_heisenfiles dict.
1189         # However that's not necessarily the case for heisenfiles found by
1190         # path. Therefore we tell the setAttrs method of each file to only
1191         # perform the update if the file is at the correct direntry.
1192
1193         d = defer.succeed(None)
1194         for f in files:
1195             d.addBoth(f.setAttrs, attrs, only_if_at=direntry)
1196
1197         def _done(ign):
1198             self.log("done %r" % (request,), level=OPERATIONAL)
1199             # TODO: this should not return True if only_if_at caused all files to be skipped.
1200             return len(files) > 0
1201         d.addBoth(_done)
1202         return d
1203
1204     def _sync_heisenfiles(self, userpath, direntry, ignore=None):
1205         request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
1206         self.log(request, level=OPERATIONAL)
1207
1208         _assert(isinstance(userpath, str) and isinstance(direntry, (str, NoneType)),
1209                 userpath=userpath, direntry=direntry)
1210
1211         files = []
1212         if direntry in all_heisenfiles:
1213             files = all_heisenfiles[direntry]
1214         if userpath in self._heisenfiles:
1215             files += self._heisenfiles[userpath]
1216
1217         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1218
1219         d = defer.succeed(None)
1220         for f in files:
1221             if f is not ignore:
1222                 d.addBoth(f.sync)
1223
1224         def _done(ign):
1225             self.log("done %r" % (request,), level=OPERATIONAL)
1226             return None
1227         d.addBoth(_done)
1228         return d
1229
1230     def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
1231         if noisy: self.log("._remove_heisenfile(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
1232
1233         _assert(isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)),
1234                 userpath=userpath, childname=childname)
1235
1236         direntry = _direntry_for(parent, childname)
1237         if direntry in all_heisenfiles:
1238             all_old_files = all_heisenfiles[direntry]
1239             all_new_files = [f for f in all_old_files if f is not file_to_remove]
1240             if len(all_new_files) > 0:
1241                 all_heisenfiles[direntry] = all_new_files
1242             else:
1243                 del all_heisenfiles[direntry]
1244
1245         if userpath in self._heisenfiles:
1246             old_files = self._heisenfiles[userpath]
1247             new_files = [f for f in old_files if f is not file_to_remove]
1248             if len(new_files) > 0:
1249                 self._heisenfiles[userpath] = new_files
1250             else:
1251                 del self._heisenfiles[userpath]
1252
1253         if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1254
1255     def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
1256         if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
1257                            (existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
1258                            level=NOISY)
1259
1260         _assert((isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)) and
1261                 (metadata is None or 'no-write' in metadata)),
1262                 userpath=userpath, childname=childname, metadata=metadata)
1263
1264         writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
1265         direntry = _direntry_for(parent, childname, filenode)
1266
1267         d = self._sync_heisenfiles(userpath, direntry, ignore=existing_file)
1268
1269         if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
1270             d.addCallback(lambda ign: ShortReadOnlySFTPFile(userpath, filenode, metadata))
1271         else:
1272             close_notify = None
1273             if writing:
1274                 close_notify = self._remove_heisenfile
1275
1276             d.addCallback(lambda ign: existing_file or GeneralSFTPFile(userpath, flags, close_notify, self._convergence))
1277             def _got_file(file):
1278                 file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
1279                 if writing:
1280                     self._add_heisenfile_by_direntry(file)
1281                 return file
1282             d.addCallback(_got_file)
1283         return d
1284
1285     def openFile(self, pathstring, flags, attrs, delay=None):
1286         request = ".openFile(%r, %r = %r, %r, delay=%r)" % (pathstring, flags, _repr_flags(flags), attrs, delay)
1287         self.log(request, level=OPERATIONAL)
1288
1289         # This is used for both reading and writing.
1290         # First exclude invalid combinations of flags, and empty paths.
1291
1292         if not (flags & (FXF_READ | FXF_WRITE)):
1293             def _bad_readwrite():
1294                 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
1295             return defer.execute(_bad_readwrite)
1296
1297         if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
1298             def _bad_exclcreat():
1299                 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
1300             return defer.execute(_bad_exclcreat)
1301
1302         path = self._path_from_string(pathstring)
1303         if not path:
1304             def _emptypath(): raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
1305             return defer.execute(_emptypath)
1306
1307         # The combination of flags is potentially valid.
1308
1309         # To work around clients that have race condition bugs, a getAttr, rename, or
1310         # remove request following an 'open' request with FXF_WRITE or FXF_CREAT flags,
1311         # should succeed even if the 'open' request has not yet completed. So we now
1312         # synchronously add a file object into the self._heisenfiles dict, indexed
1313         # by its UTF-8 userpath. (We can't yet add it to the all_heisenfiles dict,
1314         # because we don't yet have a user-independent path for the file.) The file
1315         # object does not know its filenode, parent, or childname at this point.
1316
1317         userpath = self._path_to_utf8(path)
1318
1319         if flags & (FXF_WRITE | FXF_CREAT):
1320             file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
1321             self._add_heisenfile_by_path(file)
1322         else:
1323             # We haven't decided which file implementation to use yet.
1324             file = None
1325
1326         desired_metadata = _attrs_to_metadata(attrs)
1327
1328         # Now there are two major cases:
1329         #
1330         #  1. The path is specified as /uri/FILECAP, with no parent directory.
1331         #     If the FILECAP is mutable and writeable, then we can open it in write-only
1332         #     or read/write mode (non-exclusively), otherwise we can only open it in
1333         #     read-only mode. The open should succeed immediately as long as FILECAP is
1334         #     a valid known filecap that grants the required permission.
1335         #
1336         #  2. The path is specified relative to a parent. We find the parent dirnode and
1337         #     get the child's URI and metadata if it exists. There are four subcases:
1338         #       a. the child does not exist: FXF_CREAT must be set, and we must be able
1339         #          to write to the parent directory.
1340         #       b. the child exists but is not a valid known filecap: fail
1341         #       c. the child is mutable: if we are trying to open it write-only or
1342         #          read/write, then we must be able to write to the file.
1343         #       d. the child is immutable: if we are trying to open it write-only or
1344         #          read/write, then we must be able to write to the parent directory.
1345         #
1346         # To reduce latency, open normally succeeds as soon as these conditions are
1347         # met, even though there might be a failure in downloading the existing file
1348         # or uploading a new one. However, there is an exception: if a file has been
1349         # written, then closed, and is now being reopened, then we have to delay the
1350         # open until the previous upload/publish has completed. This is necessary
1351         # because sshfs does not wait for the result of an FXF_CLOSE message before
1352         # reporting to the client that a file has been closed. It applies both to
1353         # mutable files, and to directory entries linked to an immutable file.
1354         #
1355         # Note that the permission checks below are for more precise error reporting on
1356         # the open call; later operations would fail even if we did not make these checks.
1357
1358         d = delay or defer.succeed(None)
1359         d.addCallback(lambda ign: self._get_root(path))
1360         def _got_root( (root, path) ):
1361             if root.is_unknown():
1362                 raise SFTPError(FX_PERMISSION_DENIED,
1363                                 "cannot open an unknown cap (or child of an unknown object). "
1364                                 "Upgrading the gateway to a later Tahoe-LAFS version may help")
1365             if not path:
1366                 # case 1
1367                 if noisy: self.log("case 1: root = %r, path[:-1] = %r" % (root, path[:-1]), level=NOISY)
1368                 if not IFileNode.providedBy(root):
1369                     raise SFTPError(FX_PERMISSION_DENIED,
1370                                     "cannot open a directory cap")
1371                 if (flags & FXF_WRITE) and root.is_readonly():
1372                     raise SFTPError(FX_PERMISSION_DENIED,
1373                                     "cannot write to a non-writeable filecap without a parent directory")
1374                 if flags & FXF_EXCL:
1375                     raise SFTPError(FX_FAILURE,
1376                                     "cannot create a file exclusively when it already exists")
1377
1378                 # The file does not need to be added to all_heisenfiles, because it is not
1379                 # associated with a directory entry that needs to be updated.
1380
1381                 metadata = update_metadata(None, desired_metadata, time())
1382
1383                 # We have to decide what to pass for the 'parent_readonly' argument to _no_write,
1384                 # given that we don't actually have a parent. This only affects the permissions
1385                 # reported by a getAttrs on this file handle in the case of an immutable file.
1386                 # We choose 'parent_readonly=True' since that will cause the permissions to be
1387                 # reported as r--r--r--, which is appropriate because an immutable file can't be
1388                 # written via this path.
1389
1390                 metadata['no-write'] = _no_write(True, root)
1391                 return self._make_file(file, userpath, flags, filenode=root, metadata=metadata)
1392             else:
1393                 # case 2
1394                 childname = path[-1]
1395
1396                 if noisy: self.log("case 2: root = %r, childname = %r, desired_metadata = %r, path[:-1] = %r" %
1397                                    (root, childname, desired_metadata, path[:-1]), level=NOISY)
1398                 d2 = root.get_child_at_path(path[:-1])
1399                 def _got_parent(parent):
1400                     if noisy: self.log("_got_parent(%r)" % (parent,), level=NOISY)
1401                     if parent.is_unknown():
1402                         raise SFTPError(FX_PERMISSION_DENIED,
1403                                         "cannot open a child of an unknown object. "
1404                                         "Upgrading the gateway to a later Tahoe-LAFS version may help")
1405
1406                     parent_readonly = parent.is_readonly()
1407                     d3 = defer.succeed(None)
1408                     if flags & FXF_EXCL:
1409                         # FXF_EXCL means that the link to the file (not the file itself) must
1410                         # be created atomically wrt updates by this storage client.
1411                         # That is, we need to create the link before returning success to the
1412                         # SFTP open request (and not just on close, as would normally be the
1413                         # case). We make the link initially point to a zero-length LIT file,
1414                         # which is consistent with what might happen on a POSIX filesystem.
1415
1416                         if parent_readonly:
1417                             raise SFTPError(FX_FAILURE,
1418                                             "cannot create a file exclusively when the parent directory is read-only")
1419
1420                         # 'overwrite=False' ensures failure if the link already exists.
1421                         # FIXME: should use a single call to set_uri and return (child, metadata) (#1035)
1422
1423                         zero_length_lit = "URI:LIT:"
1424                         if noisy: self.log("%r.set_uri(%r, None, readcap=%r, overwrite=False)" %
1425                                            (parent, zero_length_lit, childname), level=NOISY)
1426                         d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit,
1427                                                                   metadata=desired_metadata, overwrite=False))
1428                         def _seturi_done(child):
1429                             if noisy: self.log("%r.get_metadata_for(%r)" % (parent, childname), level=NOISY)
1430                             d4 = parent.get_metadata_for(childname)
1431                             d4.addCallback(lambda metadata: (child, metadata))
1432                             return d4
1433                         d3.addCallback(_seturi_done)
1434                     else:
1435                         if noisy: self.log("%r.get_child_and_metadata(%r)" % (parent, childname), level=NOISY)
1436                         d3.addCallback(lambda ign: parent.get_child_and_metadata(childname))
1437
1438                     def _got_child( (filenode, current_metadata) ):
1439                         if noisy: self.log("_got_child( (%r, %r) )" % (filenode, current_metadata), level=NOISY)
1440
1441                         metadata = update_metadata(current_metadata, desired_metadata, time())
1442
1443                         # Ignore the permissions of the desired_metadata in an open call. The permissions
1444                         # can only be set by setAttrs.
1445                         metadata['no-write'] = _no_write(parent_readonly, filenode, current_metadata)
1446
1447                         if filenode.is_unknown():
1448                             raise SFTPError(FX_PERMISSION_DENIED,
1449                                             "cannot open an unknown cap. Upgrading the gateway "
1450                                             "to a later Tahoe-LAFS version may help")
1451                         if not IFileNode.providedBy(filenode):
1452                             raise SFTPError(FX_PERMISSION_DENIED,
1453                                             "cannot open a directory as if it were a file")
1454                         if (flags & FXF_WRITE) and metadata['no-write']:
1455                             raise SFTPError(FX_PERMISSION_DENIED,
1456                                             "cannot open a non-writeable file for writing")
1457
1458                         return self._make_file(file, userpath, flags, parent=parent, childname=childname,
1459                                                filenode=filenode, metadata=metadata)
1460                     def _no_child(f):
1461                         if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
1462                         f.trap(NoSuchChildError)
1463
1464                         if not (flags & FXF_CREAT):
1465                             raise SFTPError(FX_NO_SUCH_FILE,
1466                                             "the file does not exist, and was not opened with the creation (CREAT) flag")
1467                         if parent_readonly:
1468                             raise SFTPError(FX_PERMISSION_DENIED,
1469                                             "cannot create a file when the parent directory is read-only")
1470
1471                         return self._make_file(file, userpath, flags, parent=parent, childname=childname)
1472                     d3.addCallbacks(_got_child, _no_child)
1473                     return d3
1474
1475                 d2.addCallback(_got_parent)
1476                 return d2
1477
1478         d.addCallback(_got_root)
1479         def _remove_on_error(err):
1480             if file:
1481                 self._remove_heisenfile(userpath, None, None, file)
1482             return err
1483         d.addErrback(_remove_on_error)
1484         d.addBoth(_convert_error, request)
1485         return d
1486
1487     def renameFile(self, from_pathstring, to_pathstring, overwrite=False):
1488         request = ".renameFile(%r, %r)" % (from_pathstring, to_pathstring)
1489         self.log(request, level=OPERATIONAL)
1490
1491         from_path = self._path_from_string(from_pathstring)
1492         to_path = self._path_from_string(to_pathstring)
1493         from_userpath = self._path_to_utf8(from_path)
1494         to_userpath = self._path_to_utf8(to_path)
1495
1496         # the target directory must already exist
1497         d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
1498                                         self._get_parent_or_node(to_path)])
1499         def _got( (from_pair, to_pair) ):
1500             if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r, overwrite=%r)" %
1501                                (from_pair, to_pair, from_pathstring, to_pathstring, overwrite), level=NOISY)
1502             (from_parent, from_childname) = from_pair
1503             (to_parent, to_childname) = to_pair
1504
1505             if from_childname is None:
1506                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename a source object specified by URI")
1507             if to_childname is None:
1508                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename to a destination specified by URI")
1509
1510             # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
1511             # "It is an error if there already exists a file with the name specified
1512             #  by newpath."
1513             # OpenSSH's SFTP server returns FX_PERMISSION_DENIED for this error.
1514             #
1515             # For the standard SSH_FXP_RENAME operation, overwrite=False.
1516             # We also support the posix-rename@openssh.com extension, which uses overwrite=True.
1517
1518             d2 = defer.succeed(None)
1519             if not overwrite:
1520                 d2.addCallback(lambda ign: to_parent.get(to_childname))
1521                 def _expect_fail(res):
1522                     if not isinstance(res, Failure):
1523                         raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1524
1525                     # It is OK if we fail for errors other than NoSuchChildError, since that probably
1526                     # indicates some problem accessing the destination directory.
1527                     res.trap(NoSuchChildError)
1528                 d2.addBoth(_expect_fail)
1529
1530             # If there are heisenfiles to be written at the 'from' direntry, then ensure
1531             # they will now be written at the 'to' direntry instead.
1532             d2.addCallback(lambda ign:
1533                            self._rename_heisenfiles(from_userpath, from_parent, from_childname,
1534                                                     to_userpath, to_parent, to_childname, overwrite=overwrite))
1535
1536             def _move(renamed):
1537                 # FIXME: use move_child_to_path to avoid possible data loss due to #943
1538                 #d3 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
1539
1540                 d3 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
1541                 def _check(err):
1542                     if noisy: self.log("_check(%r) in .renameFile(%r, %r, overwrite=%r)" %
1543                                        (err, from_pathstring, to_pathstring, overwrite), level=NOISY)
1544
1545                     if not isinstance(err, Failure) or (renamed and err.check(NoSuchChildError)):
1546                         return None
1547                     if not overwrite and err.check(ExistingChildError):
1548                         raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1549
1550                     return err
1551                 d3.addBoth(_check)
1552                 return d3
1553             d2.addCallback(_move)
1554             return d2
1555         d.addCallback(_got)
1556         d.addBoth(_convert_error, request)
1557         return d
1558
1559     def makeDirectory(self, pathstring, attrs):
1560         request = ".makeDirectory(%r, %r)" % (pathstring, attrs)
1561         self.log(request, level=OPERATIONAL)
1562
1563         path = self._path_from_string(pathstring)
1564         metadata = _attrs_to_metadata(attrs)
1565         if 'no-write' in metadata:
1566             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "cannot create a directory that is initially read-only")
1567             return defer.execute(_denied)
1568
1569         d = self._get_root(path)
1570         d.addCallback(lambda (root, path):
1571                       self._get_or_create_directories(root, path, metadata))
1572         d.addBoth(_convert_error, request)
1573         return d
1574
1575     def _get_or_create_directories(self, node, path, metadata):
1576         if not IDirectoryNode.providedBy(node):
1577             # TODO: provide the name of the blocking file in the error message.
1578             def _blocked(): raise SFTPError(FX_FAILURE, "cannot create directory because there "
1579                                                         "is a file in the way") # close enough
1580             return defer.execute(_blocked)
1581
1582         if not path:
1583             return defer.succeed(node)
1584         d = node.get(path[0])
1585         def _maybe_create(f):
1586             f.trap(NoSuchChildError)
1587             return node.create_subdirectory(path[0])
1588         d.addErrback(_maybe_create)
1589         d.addCallback(self._get_or_create_directories, path[1:], metadata)
1590         return d
1591
1592     def removeFile(self, pathstring):
1593         request = ".removeFile(%r)" % (pathstring,)
1594         self.log(request, level=OPERATIONAL)
1595
1596         path = self._path_from_string(pathstring)
1597         d = self._remove_object(path, must_be_file=True)
1598         d.addBoth(_convert_error, request)
1599         return d
1600
1601     def removeDirectory(self, pathstring):
1602         request = ".removeDirectory(%r)" % (pathstring,)
1603         self.log(request, level=OPERATIONAL)
1604
1605         path = self._path_from_string(pathstring)
1606         d = self._remove_object(path, must_be_directory=True)
1607         d.addBoth(_convert_error, request)
1608         return d
1609
1610     def _remove_object(self, path, must_be_directory=False, must_be_file=False):
1611         userpath = self._path_to_utf8(path)
1612         d = self._get_parent_or_node(path)
1613         def _got_parent( (parent, childname) ):
1614             if childname is None:
1615                 raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
1616
1617             direntry = _direntry_for(parent, childname)
1618             d2 = defer.succeed(False)
1619             if not must_be_directory:
1620                 d2.addCallback(lambda ign: self._abandon_any_heisenfiles(userpath, direntry))
1621
1622             d2.addCallback(lambda abandoned:
1623                            parent.delete(childname, must_exist=not abandoned,
1624                                          must_be_directory=must_be_directory, must_be_file=must_be_file))
1625             return d2
1626         d.addCallback(_got_parent)
1627         return d
1628
1629     def openDirectory(self, pathstring):
1630         request = ".openDirectory(%r)" % (pathstring,)
1631         self.log(request, level=OPERATIONAL)
1632
1633         path = self._path_from_string(pathstring)
1634         d = self._get_parent_or_node(path)
1635         def _got_parent_or_node( (parent_or_node, childname) ):
1636             if noisy: self.log("_got_parent_or_node( (%r, %r) ) in openDirectory(%r)" %
1637                                (parent_or_node, childname, pathstring), level=NOISY)
1638             if childname is None:
1639                 return parent_or_node
1640             else:
1641                 return parent_or_node.get(childname)
1642         d.addCallback(_got_parent_or_node)
1643         def _list(dirnode):
1644             if dirnode.is_unknown():
1645                 raise SFTPError(FX_PERMISSION_DENIED,
1646                                 "cannot list an unknown cap as a directory. Upgrading the gateway "
1647                                 "to a later Tahoe-LAFS version may help")
1648             if not IDirectoryNode.providedBy(dirnode):
1649                 raise SFTPError(FX_PERMISSION_DENIED,
1650                                 "cannot list a file as if it were a directory")
1651
1652             d2 = dirnode.list()
1653             def _render(children):
1654                 parent_readonly = dirnode.is_readonly()
1655                 results = []
1656                 for filename, (child, metadata) in children.iteritems():
1657                     # The file size may be cached or absent.
1658                     metadata['no-write'] = _no_write(parent_readonly, child, metadata)
1659                     attrs = _populate_attrs(child, metadata)
1660                     filename_utf8 = filename.encode('utf-8')
1661                     longname = _lsLine(filename_utf8, attrs)
1662                     results.append( (filename_utf8, longname, attrs) )
1663                 return StoppableList(results)
1664             d2.addCallback(_render)
1665             return d2
1666         d.addCallback(_list)
1667         d.addBoth(_convert_error, request)
1668         return d
1669
1670     def getAttrs(self, pathstring, followLinks):
1671         request = ".getAttrs(%r, followLinks=%r)" % (pathstring, followLinks)
1672         self.log(request, level=OPERATIONAL)
1673
1674         # When asked about a specific file, report its current size.
1675         # TODO: the modification time for a mutable file should be
1676         # reported as the update time of the best version. But that
1677         # information isn't currently stored in mutable shares, I think.
1678
1679         path = self._path_from_string(pathstring)
1680         userpath = self._path_to_utf8(path)
1681         d = self._get_parent_or_node(path)
1682         def _got_parent_or_node( (parent_or_node, childname) ):
1683             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1684
1685             # Some clients will incorrectly try to get the attributes
1686             # of a file immediately after opening it, before it has been put
1687             # into the all_heisenfiles table. This is a race condition bug in
1688             # the client, but we handle it anyway by calling .sync() on all
1689             # files matching either the path or the direntry.
1690
1691             direntry = _direntry_for(parent_or_node, childname)
1692             d2 = self._sync_heisenfiles(userpath, direntry)
1693
1694             if childname is None:
1695                 node = parent_or_node
1696                 d2.addCallback(lambda ign: node.get_current_size())
1697                 d2.addCallback(lambda size:
1698                                _populate_attrs(node, {'no-write': node.is_unknown() or node.is_readonly()}, size=size))
1699             else:
1700                 parent = parent_or_node
1701                 d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname]))
1702                 def _got( (child, metadata) ):
1703                     if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
1704                     _assert(IDirectoryNode.providedBy(parent), parent=parent)
1705                     metadata['no-write'] = _no_write(parent.is_readonly(), child, metadata)
1706                     d3 = child.get_current_size()
1707                     d3.addCallback(lambda size: _populate_attrs(child, metadata, size=size))
1708                     return d3
1709                 def _nosuch(err):
1710                     if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
1711                     err.trap(NoSuchChildError)
1712                     if noisy: self.log("checking open files:\nself._heisenfiles = %r\nall_heisenfiles = %r\ndirentry=%r" %
1713                                        (self._heisenfiles, all_heisenfiles, direntry), level=NOISY)
1714                     if direntry in all_heisenfiles:
1715                         files = all_heisenfiles[direntry]
1716                         if len(files) == 0:  # pragma: no cover
1717                             return err
1718                         # use the heisenfile that was most recently opened
1719                         return files[-1].getAttrs()
1720                     return err
1721                 d2.addCallbacks(_got, _nosuch)
1722             return d2
1723         d.addCallback(_got_parent_or_node)
1724         d.addBoth(_convert_error, request)
1725         return d
1726
1727     def setAttrs(self, pathstring, attrs):
1728         request = ".setAttrs(%r, %r)" % (pathstring, attrs)
1729         self.log(request, level=OPERATIONAL)
1730
1731         if "size" in attrs:
1732             # this would require us to download and re-upload the truncated/extended
1733             # file contents
1734             def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute unsupported")
1735             return defer.execute(_unsupported)
1736
1737         path = self._path_from_string(pathstring)
1738         userpath = self._path_to_utf8(path)
1739         d = self._get_parent_or_node(path)
1740         def _got_parent_or_node( (parent_or_node, childname) ):
1741             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1742
1743             direntry = _direntry_for(parent_or_node, childname)
1744             d2 = self._update_attrs_for_heisenfiles(userpath, direntry, attrs)
1745
1746             def _update(updated_heisenfiles):
1747                 if childname is None:
1748                     if updated_heisenfiles:
1749                         return None
1750                     raise SFTPError(FX_NO_SUCH_FILE, userpath)
1751                 else:
1752                     desired_metadata = _attrs_to_metadata(attrs)
1753                     if noisy: self.log("desired_metadata = %r" % (desired_metadata,), level=NOISY)
1754
1755                     d3 = parent_or_node.set_metadata_for(childname, desired_metadata)
1756                     def _nosuch(err):
1757                         if updated_heisenfiles:
1758                             err.trap(NoSuchChildError)
1759                         else:
1760                             return err
1761                     d3.addErrback(_nosuch)
1762                     return d3
1763             d2.addCallback(_update)
1764             d2.addCallback(lambda ign: None)
1765             return d2
1766         d.addCallback(_got_parent_or_node)
1767         d.addBoth(_convert_error, request)
1768         return d
1769
1770     def readLink(self, pathstring):
1771         self.log(".readLink(%r)" % (pathstring,), level=OPERATIONAL)
1772
1773         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "readLink")
1774         return defer.execute(_unsupported)
1775
1776     def makeLink(self, linkPathstring, targetPathstring):
1777         self.log(".makeLink(%r, %r)" % (linkPathstring, targetPathstring), level=OPERATIONAL)
1778
1779         # If this is implemented, note the reversal of arguments described in point 7 of
1780         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>.
1781
1782         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "makeLink")
1783         return defer.execute(_unsupported)
1784
1785     def extendedRequest(self, extensionName, extensionData):
1786         self.log(".extendedRequest(%r, <data of length %r>)" % (extensionName, len(extensionData)), level=OPERATIONAL)
1787
1788         # We implement the three main OpenSSH SFTP extensions; see
1789         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1790
1791         if extensionName == 'posix-rename@openssh.com':
1792             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "could not parse posix-rename@openssh.com request")
1793
1794             if 4 > len(extensionData): return defer.execute(_bad)
1795             (fromPathLen,) = struct.unpack('>L', extensionData[0:4])
1796             if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
1797
1798             (toPathLen,) = struct.unpack('>L', extensionData[(4 + fromPathLen):(8 + fromPathLen)])
1799             if 8 + fromPathLen + toPathLen != len(extensionData): return defer.execute(_bad)
1800
1801             fromPathstring = extensionData[4:(4 + fromPathLen)]
1802             toPathstring = extensionData[(8 + fromPathLen):]
1803             d = self.renameFile(fromPathstring, toPathstring, overwrite=True)
1804
1805             # Twisted conch assumes that the response from an extended request is either
1806             # an error, or an FXP_EXTENDED_REPLY. But it happens to do the right thing
1807             # (respond with an FXP_STATUS message) if we return a Failure with code FX_OK.
1808             def _succeeded(ign):
1809                 raise SFTPError(FX_OK, "request succeeded")
1810             d.addCallback(_succeeded)
1811             return d
1812
1813         if extensionName == 'statvfs@openssh.com' or extensionName == 'fstatvfs@openssh.com':
1814             # f_bsize and f_frsize should be the same to avoid a bug in 'df'
1815             return defer.succeed(struct.pack('>11Q',
1816                 1024,         # uint64  f_bsize     /* file system block size */
1817                 1024,         # uint64  f_frsize    /* fundamental fs block size */
1818                 628318530,    # uint64  f_blocks    /* number of blocks (unit f_frsize) */
1819                 314159265,    # uint64  f_bfree     /* free blocks in file system */
1820                 314159265,    # uint64  f_bavail    /* free blocks for non-root */
1821                 200000000,    # uint64  f_files     /* total file inodes */
1822                 100000000,    # uint64  f_ffree     /* free file inodes */
1823                 100000000,    # uint64  f_favail    /* free file inodes for non-root */
1824                 0x1AF5,       # uint64  f_fsid      /* file system id */
1825                 2,            # uint64  f_flag      /* bit mask = ST_NOSUID; not ST_RDONLY */
1826                 65535,        # uint64  f_namemax   /* maximum filename length */
1827                 ))
1828
1829         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "unsupported %r request <data of length %r>" %
1830                                                                (extensionName, len(extensionData)))
1831         return defer.execute(_unsupported)
1832
1833     def realPath(self, pathstring):
1834         self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
1835
1836         return self._path_to_utf8(self._path_from_string(pathstring))
1837
1838     def _path_to_utf8(self, path):
1839         return (u"/" + u"/".join(path)).encode('utf-8')
1840
1841     def _path_from_string(self, pathstring):
1842         if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
1843
1844         _assert(isinstance(pathstring, str), pathstring=pathstring)
1845
1846         # The home directory is the root directory.
1847         pathstring = pathstring.strip("/")
1848         if pathstring == "" or pathstring == ".":
1849             path_utf8 = []
1850         else:
1851             path_utf8 = pathstring.split("/")
1852
1853         # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.2>
1854         # "Servers SHOULD interpret a path name component ".." as referring to
1855         #  the parent directory, and "." as referring to the current directory."
1856         path = []
1857         for p_utf8 in path_utf8:
1858             if p_utf8 == "..":
1859                 # ignore excess .. components at the root
1860                 if len(path) > 0:
1861                     path = path[:-1]
1862             elif p_utf8 != ".":
1863                 try:
1864                     p = p_utf8.decode('utf-8', 'strict')
1865                 except UnicodeError:
1866                     raise SFTPError(FX_NO_SUCH_FILE, "path could not be decoded as UTF-8")
1867                 path.append(p)
1868
1869         if noisy: self.log(" PATH %r" % (path,), level=NOISY)
1870         return path
1871
1872     def _get_root(self, path):
1873         # return Deferred (root, remaining_path)
1874         d = defer.succeed(None)
1875         if path and path[0] == u"uri":
1876             d.addCallback(lambda ign: self._client.create_node_from_uri(path[1].encode('utf-8')))
1877             d.addCallback(lambda root: (root, path[2:]))
1878         else:
1879             d.addCallback(lambda ign: (self._root, path))
1880         return d
1881
1882     def _get_parent_or_node(self, path):
1883         # return Deferred (parent, childname) or (node, None)
1884         d = self._get_root(path)
1885         def _got_root( (root, remaining_path) ):
1886             if not remaining_path:
1887                 return (root, None)
1888             else:
1889                 d2 = root.get_child_at_path(remaining_path[:-1])
1890                 d2.addCallback(lambda parent: (parent, remaining_path[-1]))
1891                 return d2
1892         d.addCallback(_got_root)
1893         return d
1894
1895
1896 class FakeTransport:
1897     implements(ITransport)
1898     def write(self, data):
1899         logmsg("FakeTransport.write(<data of length %r>)" % (len(data),), level=NOISY)
1900
1901     def writeSequence(self, data):
1902         logmsg("FakeTransport.writeSequence(...)", level=NOISY)
1903
1904     def loseConnection(self):
1905         logmsg("FakeTransport.loseConnection()", level=NOISY)
1906
1907     # getPeer and getHost can just raise errors, since we don't know what to return
1908
1909
1910 class ShellSession(PrefixingLogMixin):
1911     implements(ISession)
1912     def __init__(self, userHandler):
1913         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
1914         if noisy: self.log(".__init__(%r)" % (userHandler), level=NOISY)
1915
1916     def getPty(self, terminal, windowSize, attrs):
1917         self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
1918
1919     def openShell(self, protocol):
1920         self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
1921         if hasattr(protocol, 'transport') and protocol.transport is None:
1922             protocol.transport = FakeTransport()  # work around Twisted bug
1923
1924         return self._unsupported(protocol)
1925
1926     def execCommand(self, protocol, cmd):
1927         self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
1928         if hasattr(protocol, 'transport') and protocol.transport is None:
1929             protocol.transport = FakeTransport()  # work around Twisted bug
1930
1931         d = defer.succeed(None)
1932         if cmd == "df -P -k /":
1933             d.addCallback(lambda ign: protocol.write(
1934                           "Filesystem         1024-blocks      Used Available Capacity Mounted on\r\n"
1935                           "tahoe                628318530 314159265 314159265      50% /\r\n"))
1936             d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessDone(None))))
1937         else:
1938             d.addCallback(lambda ign: self._unsupported(protocol))
1939         return d
1940
1941     def _unsupported(self, protocol):
1942         d = defer.succeed(None)
1943         d.addCallback(lambda ign: protocol.errReceived(
1944                       "This server supports only the SFTP protocol. It does not support SCP,\r\n"
1945                       "interactive shell sessions, or commands other than one needed by sshfs.\r\n"))
1946         d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
1947         return d
1948
1949     def windowChanged(self, newWindowSize):
1950         self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
1951
1952     def eofReceived(self):
1953         self.log(".eofReceived()", level=OPERATIONAL)
1954
1955     def closed(self):
1956         self.log(".closed()", level=OPERATIONAL)
1957
1958
1959 # If you have an SFTPUserHandler and want something that provides ISession, you get
1960 # ShellSession(userHandler).
1961 # We use adaptation because this must be a different object to the SFTPUserHandler.
1962 components.registerAdapter(ShellSession, SFTPUserHandler, ISession)
1963
1964
1965 from allmydata.frontends.auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
1966
1967 class Dispatcher:
1968     implements(portal.IRealm)
1969     def __init__(self, client):
1970         self._client = client
1971
1972     def requestAvatar(self, avatarID, mind, interface):
1973         _assert(interface == IConchUser, interface=interface)
1974         rootnode = self._client.create_node_from_uri(avatarID.rootcap)
1975         handler = SFTPUserHandler(self._client, rootnode, avatarID.username)
1976         return (interface, handler, handler.logout)
1977
1978
1979 class SFTPServer(service.MultiService):
1980     def __init__(self, client, accountfile, accounturl,
1981                  sftp_portstr, pubkey_file, privkey_file):
1982         service.MultiService.__init__(self)
1983
1984         r = Dispatcher(client)
1985         p = portal.Portal(r)
1986
1987         if accountfile:
1988             c = AccountFileChecker(self, accountfile)
1989             p.registerChecker(c)
1990         if accounturl:
1991             c = AccountURLChecker(self, accounturl)
1992             p.registerChecker(c)
1993         if not accountfile and not accounturl:
1994             # we could leave this anonymous, with just the /uri/CAP form
1995             raise NeedRootcapLookupScheme("must provide an account file or URL")
1996
1997         pubkey = keys.Key.fromFile(pubkey_file)
1998         privkey = keys.Key.fromFile(privkey_file)
1999         class SSHFactory(factory.SSHFactory):
2000             publicKeys = {pubkey.sshType(): pubkey}
2001             privateKeys = {privkey.sshType(): privkey}
2002             def getPrimes(self):
2003                 try:
2004                     # if present, this enables diffie-hellman-group-exchange
2005                     return primes.parseModuliFile("/etc/ssh/moduli")
2006                 except IOError:
2007                     return None
2008
2009         f = SSHFactory()
2010         f.portal = p
2011
2012         s = strports.service(sftp_portstr, f)
2013         s.setServiceParent(self)