]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/sftpd.py
Add a test, add missing imports. refs #2388
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / sftpd.py
1
2 import heapq, traceback, array, stat, struct
3 from types import NoneType
4 from stat import S_IFREG, S_IFDIR
5 from time import time, strftime, localtime
6
7 from zope.interface import implements
8 from twisted.python import components
9 from twisted.application import service, strports
10 from twisted.conch.ssh import factory, keys, session
11 from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
12      FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
13      FX_BAD_MESSAGE, FX_FAILURE, FX_OK
14 from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
15      FXF_CREAT, FXF_TRUNC, FXF_EXCL
16 from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser, ISession
17 from twisted.conch.avatar import ConchUser
18 from twisted.conch.openssh_compat import primes
19 from twisted.cred import portal
20 from twisted.internet.error import ProcessDone, ProcessTerminated
21 from twisted.python.failure import Failure
22 from twisted.internet.interfaces import ITransport
23
24 from twisted.internet import defer
25 from twisted.internet.interfaces import IConsumer
26 from foolscap.api import eventually
27 from allmydata.util import deferredutil
28
29 from allmydata.util.assertutil import _assert, precondition
30 from allmydata.util.consumer import download_to_data
31 from allmydata.util.encodingutil import get_filesystem_encoding
32 from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
33      NoSuchChildError, ChildOfWrongTypeError
34 from allmydata.mutable.common import NotWriteableError
35 from allmydata.mutable.publish import MutableFileHandle
36 from allmydata.immutable.upload import FileHandle
37 from allmydata.dirnode import update_metadata
38 from allmydata.util.fileutil import EncryptedTemporaryFile
39
40 noisy = True
41 use_foolscap_logging = True
42
43 from allmydata.util.log import NOISY, OPERATIONAL, WEIRD, \
44     msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
45
46 if use_foolscap_logging:
47     (logmsg, logerr, PrefixingLogMixin) = (_msg, _err, _PrefixingLogMixin)
48 else:  # pragma: no cover
49     def logmsg(s, level=None):
50         print s
51     def logerr(s, level=None):
52         print s
53     class PrefixingLogMixin:
54         def __init__(self, facility=None, prefix=''):
55             self.prefix = prefix
56         def log(self, s, level=None):
57             print "%r %s" % (self.prefix, s)
58
59
60 def eventually_callback(d):
61     return lambda res: eventually(d.callback, res)
62
63 def eventually_errback(d):
64     return lambda err: eventually(d.errback, err)
65
66
67 def _utf8(x):
68     if isinstance(x, unicode):
69         return x.encode('utf-8')
70     if isinstance(x, str):
71         return x
72     return repr(x)
73
74
75 def _to_sftp_time(t):
76     """SFTP times are unsigned 32-bit integers representing UTC seconds
77     (ignoring leap seconds) since the Unix epoch, January 1 1970 00:00 UTC.
78     A Tahoe time is the corresponding float."""
79     return long(t) & 0xFFFFFFFFL
80
81
82 def _convert_error(res, request):
83     """If res is not a Failure, return it, otherwise reraise the appropriate
84     SFTPError."""
85
86     if not isinstance(res, Failure):
87         logged_res = res
88         if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
89         logmsg("SUCCESS %r %r" % (request, logged_res,), level=OPERATIONAL)
90         return res
91
92     err = res
93     logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
94     try:
95         if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
96     except Exception:  # pragma: no cover
97         pass
98
99     # The message argument to SFTPError must not reveal information that
100     # might compromise anonymity, if we are running over an anonymous network.
101
102     if err.check(SFTPError):
103         # original raiser of SFTPError has responsibility to ensure anonymity
104         raise err
105     if err.check(NoSuchChildError):
106         childname = _utf8(err.value.args[0])
107         raise SFTPError(FX_NO_SUCH_FILE, childname)
108     if err.check(NotWriteableError) or err.check(ChildOfWrongTypeError):
109         msg = _utf8(err.value.args[0])
110         raise SFTPError(FX_PERMISSION_DENIED, msg)
111     if err.check(ExistingChildError):
112         # Versions of SFTP after v3 (which is what twisted.conch implements)
113         # define a specific error code for this case: FX_FILE_ALREADY_EXISTS.
114         # However v3 doesn't; instead, other servers such as sshd return
115         # FX_FAILURE. The gvfs SFTP backend, for example, depends on this
116         # to translate the error to the equivalent of POSIX EEXIST, which is
117         # necessary for some picky programs (such as gedit).
118         msg = _utf8(err.value.args[0])
119         raise SFTPError(FX_FAILURE, msg)
120     if err.check(NotImplementedError):
121         raise SFTPError(FX_OP_UNSUPPORTED, _utf8(err.value))
122     if err.check(EOFError):
123         raise SFTPError(FX_EOF, "end of file reached")
124     if err.check(defer.FirstError):
125         _convert_error(err.value.subFailure, request)
126
127     # We assume that the error message is not anonymity-sensitive.
128     raise SFTPError(FX_FAILURE, _utf8(err.value))
129
130
131 def _repr_flags(flags):
132     return "|".join([f for f in
133                      [(flags & FXF_READ)   and "FXF_READ"   or None,
134                       (flags & FXF_WRITE)  and "FXF_WRITE"  or None,
135                       (flags & FXF_APPEND) and "FXF_APPEND" or None,
136                       (flags & FXF_CREAT)  and "FXF_CREAT"  or None,
137                       (flags & FXF_TRUNC)  and "FXF_TRUNC"  or None,
138                       (flags & FXF_EXCL)   and "FXF_EXCL"   or None,
139                      ]
140                      if f])
141
142
143 def _lsLine(name, attrs):
144     st_uid = "tahoe"
145     st_gid = "tahoe"
146     st_mtime = attrs.get("mtime", 0)
147     st_mode = attrs["permissions"]
148
149     # Some clients won't tolerate '?' in the size field (#1337).
150     st_size = attrs.get("size", 0)
151
152     # We don't know how many links there really are to this object.
153     st_nlink = 1
154
155     # Based on <https://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
156     # We previously could not call the version in Twisted because we needed the change
157     # <https://twistedmatrix.com/trac/changeset/25412> (released in Twisted v8.2).
158     # Since we now depend on Twisted v10.1, consider calling Twisted's version.
159
160     mode = st_mode
161     perms = array.array('c', '-'*10)
162     ft = stat.S_IFMT(mode)
163     if   stat.S_ISDIR(ft): perms[0] = 'd'
164     elif stat.S_ISREG(ft): perms[0] = '-'
165     else: perms[0] = '?'
166     # user
167     if mode&stat.S_IRUSR: perms[1] = 'r'
168     if mode&stat.S_IWUSR: perms[2] = 'w'
169     if mode&stat.S_IXUSR: perms[3] = 'x'
170     # group
171     if mode&stat.S_IRGRP: perms[4] = 'r'
172     if mode&stat.S_IWGRP: perms[5] = 'w'
173     if mode&stat.S_IXGRP: perms[6] = 'x'
174     # other
175     if mode&stat.S_IROTH: perms[7] = 'r'
176     if mode&stat.S_IWOTH: perms[8] = 'w'
177     if mode&stat.S_IXOTH: perms[9] = 'x'
178     # suid/sgid never set
179
180     l = perms.tostring()
181     l += str(st_nlink).rjust(5) + ' '
182     un = str(st_uid)
183     l += un.ljust(9)
184     gr = str(st_gid)
185     l += gr.ljust(9)
186     sz = str(st_size)
187     l += sz.rjust(8)
188     l += ' '
189     day = 60 * 60 * 24
190     sixmo = day * 7 * 26
191     now = time()
192     if st_mtime + sixmo < now or st_mtime > now + day:
193         # mtime is more than 6 months ago, or more than one day in the future
194         l += strftime("%b %d  %Y ", localtime(st_mtime))
195     else:
196         l += strftime("%b %d %H:%M ", localtime(st_mtime))
197     l += name
198     return l
199
200
201 def _no_write(parent_readonly, child, metadata=None):
202     """Whether child should be listed as having read-only permissions in parent."""
203
204     if child.is_unknown():
205         return True
206     elif child.is_mutable():
207         return child.is_readonly()
208     elif parent_readonly or IDirectoryNode.providedBy(child):
209         return True
210     else:
211         return metadata is not None and metadata.get('no-write', False)
212
213
214 def _populate_attrs(childnode, metadata, size=None):
215     attrs = {}
216
217     # The permissions must have the S_IFDIR (040000) or S_IFREG (0100000)
218     # bits, otherwise the client may refuse to open a directory.
219     # Also, sshfs run as a non-root user requires files and directories
220     # to be world-readable/writeable.
221     # It is important that we never set the executable bits on files.
222     #
223     # Directories and unknown nodes have no size, and SFTP doesn't
224     # require us to make one up.
225     #
226     # childnode might be None, meaning that the file doesn't exist yet,
227     # but we're going to write it later.
228
229     if childnode and childnode.is_unknown():
230         perms = 0
231     elif childnode and IDirectoryNode.providedBy(childnode):
232         perms = S_IFDIR | 0777
233     else:
234         # For files, omit the size if we don't immediately know it.
235         if childnode and size is None:
236             size = childnode.get_size()
237         if size is not None:
238             _assert(isinstance(size, (int, long)) and not isinstance(size, bool), size=size)
239             attrs['size'] = size
240         perms = S_IFREG | 0666
241
242     if metadata:
243         if metadata.get('no-write', False):
244             perms &= S_IFDIR | S_IFREG | 0555  # clear 'w' bits
245
246         # See webapi.txt for what these times mean.
247         # We would prefer to omit atime, but SFTP version 3 can only
248         # accept mtime if atime is also set.
249         if 'linkmotime' in metadata.get('tahoe', {}):
250             attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
251         elif 'mtime' in metadata:
252             attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['mtime'])
253
254         if 'linkcrtime' in metadata.get('tahoe', {}):
255             attrs['createtime'] = _to_sftp_time(metadata['tahoe']['linkcrtime'])
256
257     attrs['permissions'] = perms
258
259     # twisted.conch.ssh.filetransfer only implements SFTP version 3,
260     # which doesn't include SSH_FILEXFER_ATTR_FLAGS.
261
262     return attrs
263
264
265 def _attrs_to_metadata(attrs):
266     metadata = {}
267
268     for key in attrs:
269         if key == "mtime" or key == "ctime" or key == "createtime":
270             metadata[key] = long(attrs[key])
271         elif key.startswith("ext_"):
272             metadata[key] = str(attrs[key])
273
274     perms = attrs.get('permissions', stat.S_IWUSR)
275     if not (perms & stat.S_IWUSR):
276         metadata['no-write'] = True
277
278     return metadata
279
280
281 def _direntry_for(filenode_or_parent, childname, filenode=None):
282     precondition(isinstance(childname, (unicode, NoneType)), childname=childname)
283
284     if childname is None:
285         filenode_or_parent = filenode
286
287     if filenode_or_parent:
288         rw_uri = filenode_or_parent.get_write_uri()
289         if rw_uri and childname:
290             return rw_uri + "/" + childname.encode('utf-8')
291         else:
292             return rw_uri
293
294     return None
295
296
297 class OverwriteableFileConsumer(PrefixingLogMixin):
298     implements(IConsumer)
299     """I act both as a consumer for the download of the original file contents, and as a
300     wrapper for a temporary file that records the downloaded data and any overwrites.
301     I use a priority queue to keep track of which regions of the file have been overwritten
302     but not yet downloaded, so that the download does not clobber overwritten data.
303     I use another priority queue to record milestones at which to make callbacks
304     indicating that a given number of bytes have been downloaded.
305
306     The temporary file reflects the contents of the file that I represent, except that:
307      - regions that have neither been downloaded nor overwritten, if present,
308        contain garbage.
309      - the temporary file may be shorter than the represented file (it is never longer).
310        The latter's current size is stored in self.current_size.
311
312     This abstraction is mostly independent of SFTP. Consider moving it, if it is found
313     useful for other frontends."""
314
315     def __init__(self, download_size, tempfile_maker):
316         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
317         if noisy: self.log(".__init__(%r, %r)" % (download_size, tempfile_maker), level=NOISY)
318         self.download_size = download_size
319         self.current_size = download_size
320         self.f = tempfile_maker()
321         self.downloaded = 0
322         self.milestones = []  # empty heap of (offset, d)
323         self.overwrites = []  # empty heap of (start, end)
324         self.is_closed = False
325
326         self.done = defer.Deferred()
327         self.done_status = None  # None -> not complete, Failure -> download failed, str -> download succeeded
328         self.producer = None
329
330     def get_file(self):
331         return self.f
332
333     def get_current_size(self):
334         return self.current_size
335
336     def set_current_size(self, size):
337         if noisy: self.log(".set_current_size(%r), current_size = %r, downloaded = %r" %
338                            (size, self.current_size, self.downloaded), level=NOISY)
339         if size < self.current_size or size < self.downloaded:
340             self.f.truncate(size)
341         if size > self.current_size:
342             self.overwrite(self.current_size, "\x00" * (size - self.current_size))
343         self.current_size = size
344
345         # make the invariant self.download_size <= self.current_size be true again
346         if size < self.download_size:
347             self.download_size = size
348
349         if self.downloaded >= self.download_size:
350             self.download_done("size changed")
351
352     def registerProducer(self, p, streaming):
353         if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
354         if self.producer is not None:
355             raise RuntimeError("producer is already registered")
356
357         self.producer = p
358         if streaming:
359             # call resumeProducing once to start things off
360             p.resumeProducing()
361         else:
362             def _iterate():
363                 if self.done_status is None:
364                     p.resumeProducing()
365                     eventually(_iterate)
366             _iterate()
367
368     def write(self, data):
369         if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
370         if self.is_closed:
371             return
372
373         if self.downloaded >= self.download_size:
374             return
375
376         next_downloaded = self.downloaded + len(data)
377         if next_downloaded > self.download_size:
378             data = data[:(self.download_size - self.downloaded)]
379
380         while len(self.overwrites) > 0:
381             (start, end) = self.overwrites[0]
382             if start >= next_downloaded:
383                 # This and all remaining overwrites are after the data we just downloaded.
384                 break
385             if start > self.downloaded:
386                 # The data we just downloaded has been partially overwritten.
387                 # Write the prefix of it that precedes the overwritten region.
388                 self.f.seek(self.downloaded)
389                 self.f.write(data[:(start - self.downloaded)])
390
391             # This merges consecutive overwrites if possible, which allows us to detect the
392             # case where the download can be stopped early because the remaining region
393             # to download has already been fully overwritten.
394             heapq.heappop(self.overwrites)
395             while len(self.overwrites) > 0:
396                 (start1, end1) = self.overwrites[0]
397                 if start1 > end:
398                     break
399                 end = end1
400                 heapq.heappop(self.overwrites)
401
402             if end >= next_downloaded:
403                 # This overwrite extends past the downloaded data, so there is no
404                 # more data to consider on this call.
405                 heapq.heappush(self.overwrites, (next_downloaded, end))
406                 self._update_downloaded(next_downloaded)
407                 return
408             elif end >= self.downloaded:
409                 data = data[(end - self.downloaded):]
410                 self._update_downloaded(end)
411
412         self.f.seek(self.downloaded)
413         self.f.write(data)
414         self._update_downloaded(next_downloaded)
415
416     def _update_downloaded(self, new_downloaded):
417         self.downloaded = new_downloaded
418         milestone = new_downloaded
419         if len(self.overwrites) > 0:
420             (start, end) = self.overwrites[0]
421             if start <= new_downloaded and end > milestone:
422                 milestone = end
423
424         while len(self.milestones) > 0:
425             (next, d) = self.milestones[0]
426             if next > milestone:
427                 return
428             if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
429             heapq.heappop(self.milestones)
430             eventually_callback(d)("reached")
431
432         if milestone >= self.download_size:
433             self.download_done("reached download size")
434
435     def overwrite(self, offset, data):
436         if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
437         if self.is_closed:
438             self.log("overwrite called on a closed OverwriteableFileConsumer", level=WEIRD)
439             raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
440
441         if offset > self.current_size:
442             # Normally writing at an offset beyond the current end-of-file
443             # would leave a hole that appears filled with zeroes. However, an
444             # EncryptedTemporaryFile doesn't behave like that (if there is a
445             # hole in the file on disk, the zeroes that are read back will be
446             # XORed with the keystream). So we must explicitly write zeroes in
447             # the gap between the current EOF and the offset.
448
449             self.f.seek(self.current_size)
450             self.f.write("\x00" * (offset - self.current_size))
451             start = self.current_size
452         else:
453             self.f.seek(offset)
454             start = offset
455
456         self.f.write(data)
457         end = offset + len(data)
458         self.current_size = max(self.current_size, end)
459         if end > self.downloaded:
460             heapq.heappush(self.overwrites, (start, end))
461
462     def read(self, offset, length):
463         """When the data has been read, callback the Deferred that we return with this data.
464         Otherwise errback the Deferred that we return.
465         The caller must perform no more overwrites until the Deferred has fired."""
466
467         if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
468         if self.is_closed:
469             self.log("read called on a closed OverwriteableFileConsumer", level=WEIRD)
470             raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
471
472         # Note that the overwrite method is synchronous. When a write request is processed
473         # (e.g. a writeChunk request on the async queue of GeneralSFTPFile), overwrite will
474         # be called and will update self.current_size if necessary before returning. Therefore,
475         # self.current_size will be up-to-date for a subsequent call to this read method, and
476         # so it is correct to do the check for a read past the end-of-file here.
477         if offset >= self.current_size:
478             def _eof(): raise EOFError("read past end of file")
479             return defer.execute(_eof)
480
481         if offset + length > self.current_size:
482             length = self.current_size - offset
483             if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
484
485         needed = min(offset + length, self.download_size)
486
487         # If we fail to reach the needed number of bytes, the read request will fail.
488         d = self.when_reached_or_failed(needed)
489         def _reached_in_read(res):
490             # It is not necessarily the case that self.downloaded >= needed, because
491             # the file might have been truncated (thus truncating the download) and
492             # then extended.
493
494             _assert(self.current_size >= offset + length,
495                     current_size=self.current_size, offset=offset, length=length)
496             if noisy: self.log("_reached_in_read(%r), self.f = %r" % (res, self.f,), level=NOISY)
497             self.f.seek(offset)
498             return self.f.read(length)
499         d.addCallback(_reached_in_read)
500         return d
501
502     def when_reached_or_failed(self, index):
503         if noisy: self.log(".when_reached_or_failed(%r)" % (index,), level=NOISY)
504         def _reached(res):
505             if noisy: self.log("reached %r with result %r" % (index, res), level=NOISY)
506             return res
507
508         if self.done_status is not None:
509             return defer.execute(_reached, self.done_status)
510         if index <= self.downloaded:  # already reached successfully
511             if noisy: self.log("already reached %r successfully" % (index,), level=NOISY)
512             return defer.succeed("already reached successfully")
513         d = defer.Deferred()
514         d.addCallback(_reached)
515         heapq.heappush(self.milestones, (index, d))
516         return d
517
518     def when_done(self):
519         d = defer.Deferred()
520         self.done.addCallback(lambda ign: eventually_callback(d)(self.done_status))
521         return d
522
523     def download_done(self, res):
524         _assert(isinstance(res, (str, Failure)), res=res)
525         # Only the first call to download_done counts, but we log subsequent calls
526         # (multiple calls are normal).
527         if self.done_status is not None:
528             self.log("IGNORING extra call to download_done with result %r; previous result was %r"
529                      % (res, self.done_status), level=OPERATIONAL)
530             return
531
532         self.log("DONE with result %r" % (res,), level=OPERATIONAL)
533
534         # We avoid errbacking self.done so that we are not left with an 'Unhandled error in Deferred'
535         # in case when_done() is never called. Instead we stash the failure in self.done_status,
536         # from where the callback added in when_done() can retrieve it.
537         self.done_status = res
538         eventually_callback(self.done)(None)
539
540         while len(self.milestones) > 0:
541             (next, d) = self.milestones[0]
542             if noisy: self.log("MILESTONE FINISH %r %r %r" % (next, d, res), level=NOISY)
543             heapq.heappop(self.milestones)
544             # The callback means that the milestone has been reached if
545             # it is ever going to be. Note that the file may have been
546             # truncated to before the milestone.
547             eventually_callback(d)(res)
548
549     def close(self):
550         if not self.is_closed:
551             self.is_closed = True
552             try:
553                 self.f.close()
554             except Exception, e:
555                 self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
556         self.download_done("closed")
557         return self.done_status
558
559     def unregisterProducer(self):
560         # This will happen just before our client calls download_done, which will tell
561         # us the outcome of the download; we don't know the outcome at this point.
562         self.producer = None
563         self.log("producer unregistered", level=NOISY)
564
565
566 SIZE_THRESHOLD = 1000
567
568
569 class ShortReadOnlySFTPFile(PrefixingLogMixin):
570     implements(ISFTPFile)
571     """I represent a file handle to a particular file on an SFTP connection.
572     I am used only for short immutable files opened in read-only mode.
573     When I am created, the file contents start to be downloaded to memory.
574     self.async is used to delay read requests until the download has finished."""
575
576     def __init__(self, userpath, filenode, metadata):
577         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
578         if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
579
580         precondition(isinstance(userpath, str) and IFileNode.providedBy(filenode),
581                      userpath=userpath, filenode=filenode)
582         self.filenode = filenode
583         self.metadata = metadata
584         self.async = download_to_data(filenode)
585         self.closed = False
586
587     def readChunk(self, offset, length):
588         request = ".readChunk(%r, %r)" % (offset, length)
589         self.log(request, level=OPERATIONAL)
590
591         if self.closed:
592             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
593             return defer.execute(_closed)
594
595         d = defer.Deferred()
596         def _read(data):
597             if noisy: self.log("_read(<data of length %r>) in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY)
598
599             # "In response to this request, the server will read as many bytes as it
600             #  can from the file (up to 'len'), and return them in a SSH_FXP_DATA
601             #  message.  If an error occurs or EOF is encountered before reading any
602             #  data, the server will respond with SSH_FXP_STATUS.  For normal disk
603             #  files, it is guaranteed that this will read the specified number of
604             #  bytes, or up to end of file."
605             #
606             # i.e. we respond with an EOF error iff offset is already at EOF.
607
608             if offset >= len(data):
609                 eventually_errback(d)(Failure(SFTPError(FX_EOF, "read at or past end of file")))
610             else:
611                 eventually_callback(d)(data[offset:offset+length])  # truncated if offset+length > len(data)
612             return data
613         self.async.addCallbacks(_read, eventually_errback(d))
614         d.addBoth(_convert_error, request)
615         return d
616
617     def writeChunk(self, offset, data):
618         self.log(".writeChunk(%r, <data of length %r>) denied" % (offset, len(data)), level=OPERATIONAL)
619
620         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
621         return defer.execute(_denied)
622
623     def close(self):
624         self.log(".close()", level=OPERATIONAL)
625
626         self.closed = True
627         return defer.succeed(None)
628
629     def getAttrs(self):
630         request = ".getAttrs()"
631         self.log(request, level=OPERATIONAL)
632
633         if self.closed:
634             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
635             return defer.execute(_closed)
636
637         d = defer.execute(_populate_attrs, self.filenode, self.metadata)
638         d.addBoth(_convert_error, request)
639         return d
640
641     def setAttrs(self, attrs):
642         self.log(".setAttrs(%r) denied" % (attrs,), level=OPERATIONAL)
643         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
644         return defer.execute(_denied)
645
646
647 class GeneralSFTPFile(PrefixingLogMixin):
648     implements(ISFTPFile)
649     """I represent a file handle to a particular file on an SFTP connection.
650     I wrap an instance of OverwriteableFileConsumer, which is responsible for
651     storing the file contents. In order to allow write requests to be satisfied
652     immediately, there is effectively a FIFO queue between requests made to this
653     file handle, and requests to my OverwriteableFileConsumer. This queue is
654     implemented by the callback chain of self.async.
655
656     When first constructed, I am in an 'unopened' state that causes most
657     operations to be delayed until 'open' is called."""
658
659     def __init__(self, userpath, flags, close_notify, convergence):
660         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
661         if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
662                            (userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
663
664         precondition(isinstance(userpath, str), userpath=userpath)
665         self.userpath = userpath
666         self.flags = flags
667         self.close_notify = close_notify
668         self.convergence = convergence
669         self.async = defer.Deferred()
670         # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
671         self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
672         self.closed = False
673         self.abandoned = False
674         self.parent = None
675         self.childname = None
676         self.filenode = None
677         self.metadata = None
678
679         # self.consumer should only be relied on in callbacks for self.async, since it might
680         # not be set before then.
681         self.consumer = None
682
683     def open(self, parent=None, childname=None, filenode=None, metadata=None):
684         self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
685                  (parent, childname, filenode, metadata), level=OPERATIONAL)
686
687         precondition(isinstance(childname, (unicode, NoneType)), childname=childname)
688         precondition(filenode is None or IFileNode.providedBy(filenode), filenode=filenode)
689         precondition(not self.closed, sftpfile=self)
690
691         # If the file has been renamed, the new (parent, childname) takes precedence.
692         if self.parent is None:
693             self.parent = parent
694         if self.childname is None:
695             self.childname = childname
696         self.filenode = filenode
697         self.metadata = metadata
698
699         tempfile_maker = EncryptedTemporaryFile
700
701         if (self.flags & FXF_TRUNC) or not filenode:
702             # We're either truncating or creating the file, so we don't need the old contents.
703             self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
704             self.consumer.download_done("download not needed")
705         else:
706             self.async.addCallback(lambda ignored: filenode.get_best_readable_version())
707
708             def _read(version):
709                 if noisy: self.log("_read", level=NOISY)
710                 download_size = version.get_size()
711                 _assert(download_size is not None)
712
713                 self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
714
715                 d = version.read(self.consumer, 0, None)
716                 def _finished(res):
717                     if not isinstance(res, Failure):
718                         res = "download finished"
719                     self.consumer.download_done(res)
720                 d.addBoth(_finished)
721                 # It is correct to drop d here.
722             self.async.addCallback(_read)
723
724         eventually_callback(self.async)(None)
725
726         if noisy: self.log("open done", level=NOISY)
727         return self
728
729     def get_userpath(self):
730         return self.userpath
731
732     def get_direntry(self):
733         return _direntry_for(self.parent, self.childname)
734
735     def rename(self, new_userpath, new_parent, new_childname):
736         self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
737
738         precondition(isinstance(new_userpath, str) and isinstance(new_childname, unicode),
739                      new_userpath=new_userpath, new_childname=new_childname)
740         self.userpath = new_userpath
741         self.parent = new_parent
742         self.childname = new_childname
743
744     def abandon(self):
745         self.log(".abandon()", level=OPERATIONAL)
746
747         self.abandoned = True
748
749     def sync(self, ign=None):
750         # The ign argument allows some_file.sync to be used as a callback.
751         self.log(".sync()", level=OPERATIONAL)
752
753         d = defer.Deferred()
754         self.async.addBoth(eventually_callback(d))
755         def _done(res):
756             if noisy: self.log("_done(%r) in .sync()" % (res,), level=NOISY)
757             return res
758         d.addBoth(_done)
759         return d
760
761     def readChunk(self, offset, length):
762         request = ".readChunk(%r, %r)" % (offset, length)
763         self.log(request, level=OPERATIONAL)
764
765         if not (self.flags & FXF_READ):
766             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for reading")
767             return defer.execute(_denied)
768
769         if self.closed:
770             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
771             return defer.execute(_closed)
772
773         d = defer.Deferred()
774         def _read(ign):
775             if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
776             d2 = self.consumer.read(offset, length)
777             d2.addBoth(eventually_callback(d))
778             # It is correct to drop d2 here.
779             return None
780         self.async.addCallbacks(_read, eventually_errback(d))
781         d.addBoth(_convert_error, request)
782         return d
783
784     def writeChunk(self, offset, data):
785         self.log(".writeChunk(%r, <data of length %r>)" % (offset, len(data)), level=OPERATIONAL)
786
787         if not (self.flags & FXF_WRITE):
788             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
789             return defer.execute(_denied)
790
791         if self.closed:
792             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
793             return defer.execute(_closed)
794
795         self.has_changed = True
796
797         # Note that we return without waiting for the write to occur. Reads and
798         # close wait for prior writes, and will fail if any prior operation failed.
799         # This is ok because SFTP makes no guarantee that the write completes
800         # before the request does. In fact it explicitly allows write errors to be
801         # delayed until close:
802         #   "One should note that on some server platforms even a close can fail.
803         #    This can happen e.g. if the server operating system caches writes,
804         #    and an error occurs while flushing cached writes during the close."
805
806         def _write(ign):
807             if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
808                                (offset, len(data), self.consumer.get_current_size()), level=NOISY)
809             # FXF_APPEND means that we should always write at the current end of file.
810             write_offset = offset
811             if self.flags & FXF_APPEND:
812                 write_offset = self.consumer.get_current_size()
813
814             self.consumer.overwrite(write_offset, data)
815             if noisy: self.log("overwrite done", level=NOISY)
816             return None
817         self.async.addCallback(_write)
818         # don't addErrback to self.async, just allow subsequent async ops to fail.
819         return defer.succeed(None)
820
821     def _do_close(self, res, d=None):
822         if noisy: self.log("_do_close(%r)" % (res,), level=NOISY)
823         status = None
824         if self.consumer:
825             status = self.consumer.close()
826
827         # We must close_notify before re-firing self.async.
828         if self.close_notify:
829             self.close_notify(self.userpath, self.parent, self.childname, self)
830
831         if not isinstance(res, Failure) and isinstance(status, Failure):
832             res = status
833
834         if d:
835             eventually_callback(d)(res)
836         elif isinstance(res, Failure):
837             self.log("suppressing %r" % (res,), level=OPERATIONAL)
838
839     def close(self):
840         request = ".close()"
841         self.log(request, level=OPERATIONAL)
842
843         if self.closed:
844             return defer.succeed(None)
845
846         # This means that close has been called, not that the close has succeeded.
847         self.closed = True
848
849         if not (self.flags & (FXF_WRITE | FXF_CREAT)):
850             # We never fail a close of a handle opened only for reading, even if the file
851             # failed to download. (We could not do so deterministically, because it would
852             # depend on whether we reached the point of failure before abandoning the
853             # download.) Any reads that depended on file content that could not be downloaded
854             # will have failed. It is important that we don't close the consumer until
855             # previous read operations have completed.
856             self.async.addBoth(self._do_close)
857             return defer.succeed(None)
858
859         # We must capture the abandoned, parent, and childname variables synchronously
860         # at the close call. This is needed by the correctness arguments in the comments
861         # for _abandon_any_heisenfiles and _rename_heisenfiles.
862         # Note that the file must have been opened before it can be closed.
863         abandoned = self.abandoned
864         parent = self.parent
865         childname = self.childname
866
867         # has_changed is set when writeChunk is called, not when the write occurs, so
868         # it is correct to optimize out the commit if it is False at the close call.
869         has_changed = self.has_changed
870
871         def _commit(ign):
872             d2 = self.consumer.when_done()
873             if self.filenode and self.filenode.is_mutable():
874                 self.log("update mutable file %r childname=%r metadata=%r"
875                          % (self.filenode, childname, self.metadata), level=OPERATIONAL)
876                 if self.metadata.get('no-write', False) and not self.filenode.is_readonly():
877                     _assert(parent and childname, parent=parent, childname=childname, metadata=self.metadata)
878                     d2.addCallback(lambda ign: parent.set_metadata_for(childname, self.metadata))
879
880                 d2.addCallback(lambda ign: self.filenode.overwrite(MutableFileHandle(self.consumer.get_file())))
881             else:
882                 def _add_file(ign):
883                     self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
884                     u = FileHandle(self.consumer.get_file(), self.convergence)
885                     return parent.add_file(childname, u, metadata=self.metadata)
886                 d2.addCallback(_add_file)
887             return d2
888
889         # If the file has been abandoned, we don't want the close operation to get "stuck",
890         # even if self.async fails to re-fire. Completing the close independently of self.async
891         # in that case should ensure that dropping an ssh connection is sufficient to abandon
892         # any heisenfiles that were not explicitly closed in that connection.
893         if abandoned or not has_changed:
894             d = defer.succeed(None)
895             self.async.addBoth(self._do_close)
896         else:
897             d = defer.Deferred()
898             self.async.addCallback(_commit)
899             self.async.addBoth(self._do_close, d)
900         d.addBoth(_convert_error, request)
901         return d
902
903     def getAttrs(self):
904         request = ".getAttrs()"
905         self.log(request, level=OPERATIONAL)
906
907         if self.closed:
908             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
909             return defer.execute(_closed)
910
911         # Optimization for read-only handles, when we already know the metadata.
912         if not (self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
913             return defer.succeed(_populate_attrs(self.filenode, self.metadata))
914
915         d = defer.Deferred()
916         def _get(ign):
917             if noisy: self.log("_get(%r) in %r, filenode = %r, metadata = %r" % (ign, request, self.filenode, self.metadata), level=NOISY)
918
919             # self.filenode might be None, but that's ok.
920             attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
921             eventually_callback(d)(attrs)
922             return None
923         self.async.addCallbacks(_get, eventually_errback(d))
924         d.addBoth(_convert_error, request)
925         return d
926
927     def setAttrs(self, attrs, only_if_at=None):
928         request = ".setAttrs(%r, only_if_at=%r)" % (attrs, only_if_at)
929         self.log(request, level=OPERATIONAL)
930
931         if not (self.flags & FXF_WRITE):
932             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
933             return defer.execute(_denied)
934
935         if self.closed:
936             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
937             return defer.execute(_closed)
938
939         size = attrs.get("size", None)
940         if size is not None and (not isinstance(size, (int, long)) or size < 0):
941             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
942             return defer.execute(_bad)
943
944         d = defer.Deferred()
945         def _set(ign):
946             if noisy: self.log("_set(%r) in %r" % (ign, request), level=NOISY)
947             current_direntry = _direntry_for(self.parent, self.childname, self.filenode)
948             if only_if_at and only_if_at != current_direntry:
949                 if noisy: self.log("not setting attributes: current_direntry=%r in %r" %
950                                    (current_direntry, request), level=NOISY)
951                 return None
952
953             now = time()
954             self.metadata = update_metadata(self.metadata, _attrs_to_metadata(attrs), now)
955             if size is not None:
956                 # TODO: should we refuse to truncate a file opened with FXF_APPEND?
957                 # <http://allmydata.org/trac/tahoe-lafs/ticket/1037#comment:20>
958                 self.consumer.set_current_size(size)
959             eventually_callback(d)(None)
960             return None
961         self.async.addCallbacks(_set, eventually_errback(d))
962         d.addBoth(_convert_error, request)
963         return d
964
965
966 class StoppableList:
967     def __init__(self, items):
968         self.items = items
969     def __iter__(self):
970         for i in self.items:
971             yield i
972     def close(self):
973         pass
974
975
976 class Reason:
977     def __init__(self, value):
978         self.value = value
979
980
981 # A "heisenfile" is a file that has been opened with write flags
982 # (FXF_WRITE and/or FXF_CREAT) and not yet close-notified.
983 # 'all_heisenfiles' maps from a direntry string to a list of
984 # GeneralSFTPFile.
985 #
986 # A direntry string is parent_write_uri + "/" + childname_utf8 for
987 # an immutable file, or file_write_uri for a mutable file.
988 # Updates to this dict are single-threaded.
989
990 all_heisenfiles = {}
991
992 def _reload():
993     global all_heisenfiles
994     all_heisenfiles = {}
995
996 class SFTPUserHandler(ConchUser, PrefixingLogMixin):
997     implements(ISFTPServer)
998     def __init__(self, client, rootnode, username):
999         ConchUser.__init__(self)
1000         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=username)
1001         if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
1002
1003         self.channelLookup["session"] = session.SSHSession
1004         self.subsystemLookup["sftp"] = FileTransferServer
1005
1006         self._client = client
1007         self._root = rootnode
1008         self._username = username
1009         self._convergence = client.convergence
1010
1011         # maps from UTF-8 paths for this user, to files written and still open
1012         self._heisenfiles = {}
1013
1014     def gotVersion(self, otherVersion, extData):
1015         self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
1016
1017         # advertise the same extensions as the OpenSSH SFTP server
1018         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1019         return {'posix-rename@openssh.com': '1',
1020                 'statvfs@openssh.com': '2',
1021                 'fstatvfs@openssh.com': '2',
1022                }
1023
1024     def logout(self):
1025         self.log(".logout()", level=OPERATIONAL)
1026
1027         for files in self._heisenfiles.itervalues():
1028             for f in files:
1029                 f.abandon()
1030
1031     def _add_heisenfile_by_path(self, file):
1032         self.log("._add_heisenfile_by_path(%r)" % (file,), level=OPERATIONAL)
1033
1034         userpath = file.get_userpath()
1035         if userpath in self._heisenfiles:
1036             self._heisenfiles[userpath] += [file]
1037         else:
1038             self._heisenfiles[userpath] = [file]
1039
1040     def _add_heisenfile_by_direntry(self, file):
1041         self.log("._add_heisenfile_by_direntry(%r)" % (file,), level=OPERATIONAL)
1042
1043         direntry = file.get_direntry()
1044         if direntry:
1045             if direntry in all_heisenfiles:
1046                 all_heisenfiles[direntry] += [file]
1047             else:
1048                 all_heisenfiles[direntry] = [file]
1049
1050     def _abandon_any_heisenfiles(self, userpath, direntry):
1051         request = "._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry)
1052         self.log(request, level=OPERATIONAL)
1053
1054         precondition(isinstance(userpath, str), userpath=userpath)
1055
1056         # First we synchronously mark all heisenfiles matching the userpath or direntry
1057         # as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
1058         # each file that we abandoned.
1059         #
1060         # For each file, the call to .abandon() occurs:
1061         #   * before the file is closed, in which case it will never be committed
1062         #     (uploaded+linked or published); or
1063         #   * after it is closed but before it has been close_notified, in which case the
1064         #     .sync() ensures that it has been committed (successfully or not) before we
1065         #     return.
1066         #
1067         # This avoids a race that might otherwise cause the file to be committed after
1068         # the remove operation has completed.
1069         #
1070         # We return a Deferred that fires with True if any files were abandoned (this
1071         # does not mean that they were not committed; it is used to determine whether
1072         # a NoSuchChildError from the attempt to delete the file should be suppressed).
1073
1074         files = []
1075         if direntry in all_heisenfiles:
1076             files = all_heisenfiles[direntry]
1077             del all_heisenfiles[direntry]
1078         if userpath in self._heisenfiles:
1079             files += self._heisenfiles[userpath]
1080             del self._heisenfiles[userpath]
1081
1082         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1083
1084         for f in files:
1085             f.abandon()
1086
1087         d = defer.succeed(None)
1088         for f in files:
1089             d.addBoth(f.sync)
1090
1091         def _done(ign):
1092             self.log("done %r" % (request,), level=OPERATIONAL)
1093             return len(files) > 0
1094         d.addBoth(_done)
1095         return d
1096
1097     def _rename_heisenfiles(self, from_userpath, from_parent, from_childname,
1098                             to_userpath, to_parent, to_childname, overwrite=True):
1099         request = ("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" %
1100                    (from_userpath, from_parent, from_childname, to_userpath, to_parent, to_childname, overwrite))
1101         self.log(request, level=OPERATIONAL)
1102
1103         precondition((isinstance(from_userpath, str) and isinstance(from_childname, unicode) and
1104                       isinstance(to_userpath, str) and isinstance(to_childname, unicode)),
1105                      from_userpath=from_userpath, from_childname=from_childname, to_userpath=to_userpath, to_childname=to_childname)
1106
1107         if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1108
1109         # First we synchronously rename all heisenfiles matching the userpath or direntry.
1110         # Then we .sync() each file that we renamed.
1111         #
1112         # For each file, the call to .rename occurs:
1113         #   * before the file is closed, in which case it will be committed at the
1114         #     new direntry; or
1115         #   * after it is closed but before it has been close_notified, in which case the
1116         #     .sync() ensures that it has been committed (successfully or not) before we
1117         #     return.
1118         #
1119         # This avoids a race that might otherwise cause the file to be committed at the
1120         # old name after the rename operation has completed.
1121         #
1122         # Note that if overwrite is False, the caller should already have checked
1123         # whether a real direntry exists at the destination. It is possible that another
1124         # direntry (heisen or real) comes to exist at the destination after that check,
1125         # but in that case it is correct for the rename to succeed (and for the commit
1126         # of the heisenfile at the destination to possibly clobber the other entry, since
1127         # that can happen anyway when we have concurrent write handles to the same direntry).
1128         #
1129         # We return a Deferred that fires with True if any files were renamed (this
1130         # does not mean that they were not committed; it is used to determine whether
1131         # a NoSuchChildError from the rename attempt should be suppressed). If overwrite
1132         # is False and there were already heisenfiles at the destination userpath or
1133         # direntry, we return a Deferred that fails with SFTPError(FX_PERMISSION_DENIED).
1134
1135         from_direntry = _direntry_for(from_parent, from_childname)
1136         to_direntry = _direntry_for(to_parent, to_childname)
1137
1138         if noisy: self.log("from_direntry = %r, to_direntry = %r, len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
1139                            (from_direntry, to_direntry, len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
1140
1141         if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
1142             def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1143             if noisy: self.log("existing", level=NOISY)
1144             return defer.execute(_existing)
1145
1146         from_files = []
1147         if from_direntry in all_heisenfiles:
1148             from_files = all_heisenfiles[from_direntry]
1149             del all_heisenfiles[from_direntry]
1150         if from_userpath in self._heisenfiles:
1151             from_files += self._heisenfiles[from_userpath]
1152             del self._heisenfiles[from_userpath]
1153
1154         if noisy: self.log("from_files = %r in %r" % (from_files, request), level=NOISY)
1155
1156         for f in from_files:
1157             f.rename(to_userpath, to_parent, to_childname)
1158             self._add_heisenfile_by_path(f)
1159             self._add_heisenfile_by_direntry(f)
1160
1161         d = defer.succeed(None)
1162         for f in from_files:
1163             d.addBoth(f.sync)
1164
1165         def _done(ign):
1166             if noisy: self.log("done: len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
1167                                (len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
1168             return len(from_files) > 0
1169         d.addBoth(_done)
1170         return d
1171
1172     def _update_attrs_for_heisenfiles(self, userpath, direntry, attrs):
1173         request = "._update_attrs_for_heisenfiles(%r, %r, %r)" % (userpath, direntry, attrs)
1174         self.log(request, level=OPERATIONAL)
1175
1176         _assert(isinstance(userpath, str) and isinstance(direntry, str),
1177                 userpath=userpath, direntry=direntry)
1178
1179         files = []
1180         if direntry in all_heisenfiles:
1181             files = all_heisenfiles[direntry]
1182         if userpath in self._heisenfiles:
1183             files += self._heisenfiles[userpath]
1184
1185         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1186
1187         # We set the metadata for all heisenfiles at this path or direntry.
1188         # Since a direntry includes a write URI, we must have authority to
1189         # change the metadata of heisenfiles found in the all_heisenfiles dict.
1190         # However that's not necessarily the case for heisenfiles found by
1191         # path. Therefore we tell the setAttrs method of each file to only
1192         # perform the update if the file is at the correct direntry.
1193
1194         d = defer.succeed(None)
1195         for f in files:
1196             d.addBoth(f.setAttrs, attrs, only_if_at=direntry)
1197
1198         def _done(ign):
1199             self.log("done %r" % (request,), level=OPERATIONAL)
1200             # TODO: this should not return True if only_if_at caused all files to be skipped.
1201             return len(files) > 0
1202         d.addBoth(_done)
1203         return d
1204
1205     def _sync_heisenfiles(self, userpath, direntry, ignore=None):
1206         request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
1207         self.log(request, level=OPERATIONAL)
1208
1209         _assert(isinstance(userpath, str) and isinstance(direntry, (str, NoneType)),
1210                 userpath=userpath, direntry=direntry)
1211
1212         files = []
1213         if direntry in all_heisenfiles:
1214             files = all_heisenfiles[direntry]
1215         if userpath in self._heisenfiles:
1216             files += self._heisenfiles[userpath]
1217
1218         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1219
1220         d = defer.succeed(None)
1221         for f in files:
1222             if f is not ignore:
1223                 d.addBoth(f.sync)
1224
1225         def _done(ign):
1226             self.log("done %r" % (request,), level=OPERATIONAL)
1227             return None
1228         d.addBoth(_done)
1229         return d
1230
1231     def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
1232         if noisy: self.log("._remove_heisenfile(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
1233
1234         _assert(isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)),
1235                 userpath=userpath, childname=childname)
1236
1237         direntry = _direntry_for(parent, childname)
1238         if direntry in all_heisenfiles:
1239             all_old_files = all_heisenfiles[direntry]
1240             all_new_files = [f for f in all_old_files if f is not file_to_remove]
1241             if len(all_new_files) > 0:
1242                 all_heisenfiles[direntry] = all_new_files
1243             else:
1244                 del all_heisenfiles[direntry]
1245
1246         if userpath in self._heisenfiles:
1247             old_files = self._heisenfiles[userpath]
1248             new_files = [f for f in old_files if f is not file_to_remove]
1249             if len(new_files) > 0:
1250                 self._heisenfiles[userpath] = new_files
1251             else:
1252                 del self._heisenfiles[userpath]
1253
1254         if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1255
1256     def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
1257         if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
1258                            (existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
1259                            level=NOISY)
1260
1261         _assert((isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)) and
1262                 (metadata is None or 'no-write' in metadata)),
1263                 userpath=userpath, childname=childname, metadata=metadata)
1264
1265         writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
1266         direntry = _direntry_for(parent, childname, filenode)
1267
1268         d = self._sync_heisenfiles(userpath, direntry, ignore=existing_file)
1269
1270         if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
1271             d.addCallback(lambda ign: ShortReadOnlySFTPFile(userpath, filenode, metadata))
1272         else:
1273             close_notify = None
1274             if writing:
1275                 close_notify = self._remove_heisenfile
1276
1277             d.addCallback(lambda ign: existing_file or GeneralSFTPFile(userpath, flags, close_notify, self._convergence))
1278             def _got_file(file):
1279                 file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
1280                 if writing:
1281                     self._add_heisenfile_by_direntry(file)
1282                 return file
1283             d.addCallback(_got_file)
1284         return d
1285
1286     def openFile(self, pathstring, flags, attrs, delay=None):
1287         request = ".openFile(%r, %r = %r, %r, delay=%r)" % (pathstring, flags, _repr_flags(flags), attrs, delay)
1288         self.log(request, level=OPERATIONAL)
1289
1290         # This is used for both reading and writing.
1291         # First exclude invalid combinations of flags, and empty paths.
1292
1293         if not (flags & (FXF_READ | FXF_WRITE)):
1294             def _bad_readwrite():
1295                 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
1296             return defer.execute(_bad_readwrite)
1297
1298         if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
1299             def _bad_exclcreat():
1300                 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
1301             return defer.execute(_bad_exclcreat)
1302
1303         path = self._path_from_string(pathstring)
1304         if not path:
1305             def _emptypath(): raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
1306             return defer.execute(_emptypath)
1307
1308         # The combination of flags is potentially valid.
1309
1310         # To work around clients that have race condition bugs, a getAttr, rename, or
1311         # remove request following an 'open' request with FXF_WRITE or FXF_CREAT flags,
1312         # should succeed even if the 'open' request has not yet completed. So we now
1313         # synchronously add a file object into the self._heisenfiles dict, indexed
1314         # by its UTF-8 userpath. (We can't yet add it to the all_heisenfiles dict,
1315         # because we don't yet have a user-independent path for the file.) The file
1316         # object does not know its filenode, parent, or childname at this point.
1317
1318         userpath = self._path_to_utf8(path)
1319
1320         if flags & (FXF_WRITE | FXF_CREAT):
1321             file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
1322             self._add_heisenfile_by_path(file)
1323         else:
1324             # We haven't decided which file implementation to use yet.
1325             file = None
1326
1327         desired_metadata = _attrs_to_metadata(attrs)
1328
1329         # Now there are two major cases:
1330         #
1331         #  1. The path is specified as /uri/FILECAP, with no parent directory.
1332         #     If the FILECAP is mutable and writeable, then we can open it in write-only
1333         #     or read/write mode (non-exclusively), otherwise we can only open it in
1334         #     read-only mode. The open should succeed immediately as long as FILECAP is
1335         #     a valid known filecap that grants the required permission.
1336         #
1337         #  2. The path is specified relative to a parent. We find the parent dirnode and
1338         #     get the child's URI and metadata if it exists. There are four subcases:
1339         #       a. the child does not exist: FXF_CREAT must be set, and we must be able
1340         #          to write to the parent directory.
1341         #       b. the child exists but is not a valid known filecap: fail
1342         #       c. the child is mutable: if we are trying to open it write-only or
1343         #          read/write, then we must be able to write to the file.
1344         #       d. the child is immutable: if we are trying to open it write-only or
1345         #          read/write, then we must be able to write to the parent directory.
1346         #
1347         # To reduce latency, open normally succeeds as soon as these conditions are
1348         # met, even though there might be a failure in downloading the existing file
1349         # or uploading a new one. However, there is an exception: if a file has been
1350         # written, then closed, and is now being reopened, then we have to delay the
1351         # open until the previous upload/publish has completed. This is necessary
1352         # because sshfs does not wait for the result of an FXF_CLOSE message before
1353         # reporting to the client that a file has been closed. It applies both to
1354         # mutable files, and to directory entries linked to an immutable file.
1355         #
1356         # Note that the permission checks below are for more precise error reporting on
1357         # the open call; later operations would fail even if we did not make these checks.
1358
1359         d = delay or defer.succeed(None)
1360         d.addCallback(lambda ign: self._get_root(path))
1361         def _got_root( (root, path) ):
1362             if root.is_unknown():
1363                 raise SFTPError(FX_PERMISSION_DENIED,
1364                                 "cannot open an unknown cap (or child of an unknown object). "
1365                                 "Upgrading the gateway to a later Tahoe-LAFS version may help")
1366             if not path:
1367                 # case 1
1368                 if noisy: self.log("case 1: root = %r, path[:-1] = %r" % (root, path[:-1]), level=NOISY)
1369                 if not IFileNode.providedBy(root):
1370                     raise SFTPError(FX_PERMISSION_DENIED,
1371                                     "cannot open a directory cap")
1372                 if (flags & FXF_WRITE) and root.is_readonly():
1373                     raise SFTPError(FX_PERMISSION_DENIED,
1374                                     "cannot write to a non-writeable filecap without a parent directory")
1375                 if flags & FXF_EXCL:
1376                     raise SFTPError(FX_FAILURE,
1377                                     "cannot create a file exclusively when it already exists")
1378
1379                 # The file does not need to be added to all_heisenfiles, because it is not
1380                 # associated with a directory entry that needs to be updated.
1381
1382                 metadata = update_metadata(None, desired_metadata, time())
1383
1384                 # We have to decide what to pass for the 'parent_readonly' argument to _no_write,
1385                 # given that we don't actually have a parent. This only affects the permissions
1386                 # reported by a getAttrs on this file handle in the case of an immutable file.
1387                 # We choose 'parent_readonly=True' since that will cause the permissions to be
1388                 # reported as r--r--r--, which is appropriate because an immutable file can't be
1389                 # written via this path.
1390
1391                 metadata['no-write'] = _no_write(True, root)
1392                 return self._make_file(file, userpath, flags, filenode=root, metadata=metadata)
1393             else:
1394                 # case 2
1395                 childname = path[-1]
1396
1397                 if noisy: self.log("case 2: root = %r, childname = %r, desired_metadata = %r, path[:-1] = %r" %
1398                                    (root, childname, desired_metadata, path[:-1]), level=NOISY)
1399                 d2 = root.get_child_at_path(path[:-1])
1400                 def _got_parent(parent):
1401                     if noisy: self.log("_got_parent(%r)" % (parent,), level=NOISY)
1402                     if parent.is_unknown():
1403                         raise SFTPError(FX_PERMISSION_DENIED,
1404                                         "cannot open a child of an unknown object. "
1405                                         "Upgrading the gateway to a later Tahoe-LAFS version may help")
1406
1407                     parent_readonly = parent.is_readonly()
1408                     d3 = defer.succeed(None)
1409                     if flags & FXF_EXCL:
1410                         # FXF_EXCL means that the link to the file (not the file itself) must
1411                         # be created atomically wrt updates by this storage client.
1412                         # That is, we need to create the link before returning success to the
1413                         # SFTP open request (and not just on close, as would normally be the
1414                         # case). We make the link initially point to a zero-length LIT file,
1415                         # which is consistent with what might happen on a POSIX filesystem.
1416
1417                         if parent_readonly:
1418                             raise SFTPError(FX_FAILURE,
1419                                             "cannot create a file exclusively when the parent directory is read-only")
1420
1421                         # 'overwrite=False' ensures failure if the link already exists.
1422                         # FIXME: should use a single call to set_uri and return (child, metadata) (#1035)
1423
1424                         zero_length_lit = "URI:LIT:"
1425                         if noisy: self.log("%r.set_uri(%r, None, readcap=%r, overwrite=False)" %
1426                                            (parent, zero_length_lit, childname), level=NOISY)
1427                         d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit,
1428                                                                   metadata=desired_metadata, overwrite=False))
1429                         def _seturi_done(child):
1430                             if noisy: self.log("%r.get_metadata_for(%r)" % (parent, childname), level=NOISY)
1431                             d4 = parent.get_metadata_for(childname)
1432                             d4.addCallback(lambda metadata: (child, metadata))
1433                             return d4
1434                         d3.addCallback(_seturi_done)
1435                     else:
1436                         if noisy: self.log("%r.get_child_and_metadata(%r)" % (parent, childname), level=NOISY)
1437                         d3.addCallback(lambda ign: parent.get_child_and_metadata(childname))
1438
1439                     def _got_child( (filenode, current_metadata) ):
1440                         if noisy: self.log("_got_child( (%r, %r) )" % (filenode, current_metadata), level=NOISY)
1441
1442                         metadata = update_metadata(current_metadata, desired_metadata, time())
1443
1444                         # Ignore the permissions of the desired_metadata in an open call. The permissions
1445                         # can only be set by setAttrs.
1446                         metadata['no-write'] = _no_write(parent_readonly, filenode, current_metadata)
1447
1448                         if filenode.is_unknown():
1449                             raise SFTPError(FX_PERMISSION_DENIED,
1450                                             "cannot open an unknown cap. Upgrading the gateway "
1451                                             "to a later Tahoe-LAFS version may help")
1452                         if not IFileNode.providedBy(filenode):
1453                             raise SFTPError(FX_PERMISSION_DENIED,
1454                                             "cannot open a directory as if it were a file")
1455                         if (flags & FXF_WRITE) and metadata['no-write']:
1456                             raise SFTPError(FX_PERMISSION_DENIED,
1457                                             "cannot open a non-writeable file for writing")
1458
1459                         return self._make_file(file, userpath, flags, parent=parent, childname=childname,
1460                                                filenode=filenode, metadata=metadata)
1461                     def _no_child(f):
1462                         if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
1463                         f.trap(NoSuchChildError)
1464
1465                         if not (flags & FXF_CREAT):
1466                             raise SFTPError(FX_NO_SUCH_FILE,
1467                                             "the file does not exist, and was not opened with the creation (CREAT) flag")
1468                         if parent_readonly:
1469                             raise SFTPError(FX_PERMISSION_DENIED,
1470                                             "cannot create a file when the parent directory is read-only")
1471
1472                         return self._make_file(file, userpath, flags, parent=parent, childname=childname)
1473                     d3.addCallbacks(_got_child, _no_child)
1474                     return d3
1475
1476                 d2.addCallback(_got_parent)
1477                 return d2
1478
1479         d.addCallback(_got_root)
1480         def _remove_on_error(err):
1481             if file:
1482                 self._remove_heisenfile(userpath, None, None, file)
1483             return err
1484         d.addErrback(_remove_on_error)
1485         d.addBoth(_convert_error, request)
1486         return d
1487
1488     def renameFile(self, from_pathstring, to_pathstring, overwrite=False):
1489         request = ".renameFile(%r, %r)" % (from_pathstring, to_pathstring)
1490         self.log(request, level=OPERATIONAL)
1491
1492         from_path = self._path_from_string(from_pathstring)
1493         to_path = self._path_from_string(to_pathstring)
1494         from_userpath = self._path_to_utf8(from_path)
1495         to_userpath = self._path_to_utf8(to_path)
1496
1497         # the target directory must already exist
1498         d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
1499                                         self._get_parent_or_node(to_path)])
1500         def _got( (from_pair, to_pair) ):
1501             if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r, overwrite=%r)" %
1502                                (from_pair, to_pair, from_pathstring, to_pathstring, overwrite), level=NOISY)
1503             (from_parent, from_childname) = from_pair
1504             (to_parent, to_childname) = to_pair
1505
1506             if from_childname is None:
1507                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename a source object specified by URI")
1508             if to_childname is None:
1509                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename to a destination specified by URI")
1510
1511             # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
1512             # "It is an error if there already exists a file with the name specified
1513             #  by newpath."
1514             # OpenSSH's SFTP server returns FX_PERMISSION_DENIED for this error.
1515             #
1516             # For the standard SSH_FXP_RENAME operation, overwrite=False.
1517             # We also support the posix-rename@openssh.com extension, which uses overwrite=True.
1518
1519             d2 = defer.succeed(None)
1520             if not overwrite:
1521                 d2.addCallback(lambda ign: to_parent.get(to_childname))
1522                 def _expect_fail(res):
1523                     if not isinstance(res, Failure):
1524                         raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1525
1526                     # It is OK if we fail for errors other than NoSuchChildError, since that probably
1527                     # indicates some problem accessing the destination directory.
1528                     res.trap(NoSuchChildError)
1529                 d2.addBoth(_expect_fail)
1530
1531             # If there are heisenfiles to be written at the 'from' direntry, then ensure
1532             # they will now be written at the 'to' direntry instead.
1533             d2.addCallback(lambda ign:
1534                            self._rename_heisenfiles(from_userpath, from_parent, from_childname,
1535                                                     to_userpath, to_parent, to_childname, overwrite=overwrite))
1536
1537             def _move(renamed):
1538                 # FIXME: use move_child_to_path to avoid possible data loss due to #943
1539                 #d3 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
1540
1541                 d3 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
1542                 def _check(err):
1543                     if noisy: self.log("_check(%r) in .renameFile(%r, %r, overwrite=%r)" %
1544                                        (err, from_pathstring, to_pathstring, overwrite), level=NOISY)
1545
1546                     if not isinstance(err, Failure) or (renamed and err.check(NoSuchChildError)):
1547                         return None
1548                     if not overwrite and err.check(ExistingChildError):
1549                         raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1550
1551                     return err
1552                 d3.addBoth(_check)
1553                 return d3
1554             d2.addCallback(_move)
1555             return d2
1556         d.addCallback(_got)
1557         d.addBoth(_convert_error, request)
1558         return d
1559
1560     def makeDirectory(self, pathstring, attrs):
1561         request = ".makeDirectory(%r, %r)" % (pathstring, attrs)
1562         self.log(request, level=OPERATIONAL)
1563
1564         path = self._path_from_string(pathstring)
1565         metadata = _attrs_to_metadata(attrs)
1566         if 'no-write' in metadata:
1567             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "cannot create a directory that is initially read-only")
1568             return defer.execute(_denied)
1569
1570         d = self._get_root(path)
1571         d.addCallback(lambda (root, path):
1572                       self._get_or_create_directories(root, path, metadata))
1573         d.addBoth(_convert_error, request)
1574         return d
1575
1576     def _get_or_create_directories(self, node, path, metadata):
1577         if not IDirectoryNode.providedBy(node):
1578             # TODO: provide the name of the blocking file in the error message.
1579             def _blocked(): raise SFTPError(FX_FAILURE, "cannot create directory because there "
1580                                                         "is a file in the way") # close enough
1581             return defer.execute(_blocked)
1582
1583         if not path:
1584             return defer.succeed(node)
1585         d = node.get(path[0])
1586         def _maybe_create(f):
1587             f.trap(NoSuchChildError)
1588             return node.create_subdirectory(path[0])
1589         d.addErrback(_maybe_create)
1590         d.addCallback(self._get_or_create_directories, path[1:], metadata)
1591         return d
1592
1593     def removeFile(self, pathstring):
1594         request = ".removeFile(%r)" % (pathstring,)
1595         self.log(request, level=OPERATIONAL)
1596
1597         path = self._path_from_string(pathstring)
1598         d = self._remove_object(path, must_be_file=True)
1599         d.addBoth(_convert_error, request)
1600         return d
1601
1602     def removeDirectory(self, pathstring):
1603         request = ".removeDirectory(%r)" % (pathstring,)
1604         self.log(request, level=OPERATIONAL)
1605
1606         path = self._path_from_string(pathstring)
1607         d = self._remove_object(path, must_be_directory=True)
1608         d.addBoth(_convert_error, request)
1609         return d
1610
1611     def _remove_object(self, path, must_be_directory=False, must_be_file=False):
1612         userpath = self._path_to_utf8(path)
1613         d = self._get_parent_or_node(path)
1614         def _got_parent( (parent, childname) ):
1615             if childname is None:
1616                 raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
1617
1618             direntry = _direntry_for(parent, childname)
1619             d2 = defer.succeed(False)
1620             if not must_be_directory:
1621                 d2.addCallback(lambda ign: self._abandon_any_heisenfiles(userpath, direntry))
1622
1623             d2.addCallback(lambda abandoned:
1624                            parent.delete(childname, must_exist=not abandoned,
1625                                          must_be_directory=must_be_directory, must_be_file=must_be_file))
1626             return d2
1627         d.addCallback(_got_parent)
1628         return d
1629
1630     def openDirectory(self, pathstring):
1631         request = ".openDirectory(%r)" % (pathstring,)
1632         self.log(request, level=OPERATIONAL)
1633
1634         path = self._path_from_string(pathstring)
1635         d = self._get_parent_or_node(path)
1636         def _got_parent_or_node( (parent_or_node, childname) ):
1637             if noisy: self.log("_got_parent_or_node( (%r, %r) ) in openDirectory(%r)" %
1638                                (parent_or_node, childname, pathstring), level=NOISY)
1639             if childname is None:
1640                 return parent_or_node
1641             else:
1642                 return parent_or_node.get(childname)
1643         d.addCallback(_got_parent_or_node)
1644         def _list(dirnode):
1645             if dirnode.is_unknown():
1646                 raise SFTPError(FX_PERMISSION_DENIED,
1647                                 "cannot list an unknown cap as a directory. Upgrading the gateway "
1648                                 "to a later Tahoe-LAFS version may help")
1649             if not IDirectoryNode.providedBy(dirnode):
1650                 raise SFTPError(FX_PERMISSION_DENIED,
1651                                 "cannot list a file as if it were a directory")
1652
1653             d2 = dirnode.list()
1654             def _render(children):
1655                 parent_readonly = dirnode.is_readonly()
1656                 results = []
1657                 for filename, (child, metadata) in children.iteritems():
1658                     # The file size may be cached or absent.
1659                     metadata['no-write'] = _no_write(parent_readonly, child, metadata)
1660                     attrs = _populate_attrs(child, metadata)
1661                     filename_utf8 = filename.encode('utf-8')
1662                     longname = _lsLine(filename_utf8, attrs)
1663                     results.append( (filename_utf8, longname, attrs) )
1664                 return StoppableList(results)
1665             d2.addCallback(_render)
1666             return d2
1667         d.addCallback(_list)
1668         d.addBoth(_convert_error, request)
1669         return d
1670
1671     def getAttrs(self, pathstring, followLinks):
1672         request = ".getAttrs(%r, followLinks=%r)" % (pathstring, followLinks)
1673         self.log(request, level=OPERATIONAL)
1674
1675         # When asked about a specific file, report its current size.
1676         # TODO: the modification time for a mutable file should be
1677         # reported as the update time of the best version. But that
1678         # information isn't currently stored in mutable shares, I think.
1679
1680         path = self._path_from_string(pathstring)
1681         userpath = self._path_to_utf8(path)
1682         d = self._get_parent_or_node(path)
1683         def _got_parent_or_node( (parent_or_node, childname) ):
1684             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1685
1686             # Some clients will incorrectly try to get the attributes
1687             # of a file immediately after opening it, before it has been put
1688             # into the all_heisenfiles table. This is a race condition bug in
1689             # the client, but we handle it anyway by calling .sync() on all
1690             # files matching either the path or the direntry.
1691
1692             direntry = _direntry_for(parent_or_node, childname)
1693             d2 = self._sync_heisenfiles(userpath, direntry)
1694
1695             if childname is None:
1696                 node = parent_or_node
1697                 d2.addCallback(lambda ign: node.get_current_size())
1698                 d2.addCallback(lambda size:
1699                                _populate_attrs(node, {'no-write': node.is_unknown() or node.is_readonly()}, size=size))
1700             else:
1701                 parent = parent_or_node
1702                 d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname]))
1703                 def _got( (child, metadata) ):
1704                     if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
1705                     _assert(IDirectoryNode.providedBy(parent), parent=parent)
1706                     metadata['no-write'] = _no_write(parent.is_readonly(), child, metadata)
1707                     d3 = child.get_current_size()
1708                     d3.addCallback(lambda size: _populate_attrs(child, metadata, size=size))
1709                     return d3
1710                 def _nosuch(err):
1711                     if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
1712                     err.trap(NoSuchChildError)
1713                     if noisy: self.log("checking open files:\nself._heisenfiles = %r\nall_heisenfiles = %r\ndirentry=%r" %
1714                                        (self._heisenfiles, all_heisenfiles, direntry), level=NOISY)
1715                     if direntry in all_heisenfiles:
1716                         files = all_heisenfiles[direntry]
1717                         if len(files) == 0:  # pragma: no cover
1718                             return err
1719                         # use the heisenfile that was most recently opened
1720                         return files[-1].getAttrs()
1721                     return err
1722                 d2.addCallbacks(_got, _nosuch)
1723             return d2
1724         d.addCallback(_got_parent_or_node)
1725         d.addBoth(_convert_error, request)
1726         return d
1727
1728     def setAttrs(self, pathstring, attrs):
1729         request = ".setAttrs(%r, %r)" % (pathstring, attrs)
1730         self.log(request, level=OPERATIONAL)
1731
1732         if "size" in attrs:
1733             # this would require us to download and re-upload the truncated/extended
1734             # file contents
1735             def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute unsupported")
1736             return defer.execute(_unsupported)
1737
1738         path = self._path_from_string(pathstring)
1739         userpath = self._path_to_utf8(path)
1740         d = self._get_parent_or_node(path)
1741         def _got_parent_or_node( (parent_or_node, childname) ):
1742             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1743
1744             direntry = _direntry_for(parent_or_node, childname)
1745             d2 = self._update_attrs_for_heisenfiles(userpath, direntry, attrs)
1746
1747             def _update(updated_heisenfiles):
1748                 if childname is None:
1749                     if updated_heisenfiles:
1750                         return None
1751                     raise SFTPError(FX_NO_SUCH_FILE, userpath)
1752                 else:
1753                     desired_metadata = _attrs_to_metadata(attrs)
1754                     if noisy: self.log("desired_metadata = %r" % (desired_metadata,), level=NOISY)
1755
1756                     d3 = parent_or_node.set_metadata_for(childname, desired_metadata)
1757                     def _nosuch(err):
1758                         if updated_heisenfiles:
1759                             err.trap(NoSuchChildError)
1760                         else:
1761                             return err
1762                     d3.addErrback(_nosuch)
1763                     return d3
1764             d2.addCallback(_update)
1765             d2.addCallback(lambda ign: None)
1766             return d2
1767         d.addCallback(_got_parent_or_node)
1768         d.addBoth(_convert_error, request)
1769         return d
1770
1771     def readLink(self, pathstring):
1772         self.log(".readLink(%r)" % (pathstring,), level=OPERATIONAL)
1773
1774         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "readLink")
1775         return defer.execute(_unsupported)
1776
1777     def makeLink(self, linkPathstring, targetPathstring):
1778         self.log(".makeLink(%r, %r)" % (linkPathstring, targetPathstring), level=OPERATIONAL)
1779
1780         # If this is implemented, note the reversal of arguments described in point 7 of
1781         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>.
1782
1783         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "makeLink")
1784         return defer.execute(_unsupported)
1785
1786     def extendedRequest(self, extensionName, extensionData):
1787         self.log(".extendedRequest(%r, <data of length %r>)" % (extensionName, len(extensionData)), level=OPERATIONAL)
1788
1789         # We implement the three main OpenSSH SFTP extensions; see
1790         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1791
1792         if extensionName == 'posix-rename@openssh.com':
1793             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "could not parse posix-rename@openssh.com request")
1794
1795             if 4 > len(extensionData): return defer.execute(_bad)
1796             (fromPathLen,) = struct.unpack('>L', extensionData[0:4])
1797             if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
1798
1799             (toPathLen,) = struct.unpack('>L', extensionData[(4 + fromPathLen):(8 + fromPathLen)])
1800             if 8 + fromPathLen + toPathLen != len(extensionData): return defer.execute(_bad)
1801
1802             fromPathstring = extensionData[4:(4 + fromPathLen)]
1803             toPathstring = extensionData[(8 + fromPathLen):]
1804             d = self.renameFile(fromPathstring, toPathstring, overwrite=True)
1805
1806             # Twisted conch assumes that the response from an extended request is either
1807             # an error, or an FXP_EXTENDED_REPLY. But it happens to do the right thing
1808             # (respond with an FXP_STATUS message) if we return a Failure with code FX_OK.
1809             def _succeeded(ign):
1810                 raise SFTPError(FX_OK, "request succeeded")
1811             d.addCallback(_succeeded)
1812             return d
1813
1814         if extensionName == 'statvfs@openssh.com' or extensionName == 'fstatvfs@openssh.com':
1815             # f_bsize and f_frsize should be the same to avoid a bug in 'df'
1816             return defer.succeed(struct.pack('>11Q',
1817                 1024,         # uint64  f_bsize     /* file system block size */
1818                 1024,         # uint64  f_frsize    /* fundamental fs block size */
1819                 628318530,    # uint64  f_blocks    /* number of blocks (unit f_frsize) */
1820                 314159265,    # uint64  f_bfree     /* free blocks in file system */
1821                 314159265,    # uint64  f_bavail    /* free blocks for non-root */
1822                 200000000,    # uint64  f_files     /* total file inodes */
1823                 100000000,    # uint64  f_ffree     /* free file inodes */
1824                 100000000,    # uint64  f_favail    /* free file inodes for non-root */
1825                 0x1AF5,       # uint64  f_fsid      /* file system id */
1826                 2,            # uint64  f_flag      /* bit mask = ST_NOSUID; not ST_RDONLY */
1827                 65535,        # uint64  f_namemax   /* maximum filename length */
1828                 ))
1829
1830         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "unsupported %r request <data of length %r>" %
1831                                                                (extensionName, len(extensionData)))
1832         return defer.execute(_unsupported)
1833
1834     def realPath(self, pathstring):
1835         self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
1836
1837         return self._path_to_utf8(self._path_from_string(pathstring))
1838
1839     def _path_to_utf8(self, path):
1840         return (u"/" + u"/".join(path)).encode('utf-8')
1841
1842     def _path_from_string(self, pathstring):
1843         if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
1844
1845         _assert(isinstance(pathstring, str), pathstring=pathstring)
1846
1847         # The home directory is the root directory.
1848         pathstring = pathstring.strip("/")
1849         if pathstring == "" or pathstring == ".":
1850             path_utf8 = []
1851         else:
1852             path_utf8 = pathstring.split("/")
1853
1854         # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.2>
1855         # "Servers SHOULD interpret a path name component ".." as referring to
1856         #  the parent directory, and "." as referring to the current directory."
1857         path = []
1858         for p_utf8 in path_utf8:
1859             if p_utf8 == "..":
1860                 # ignore excess .. components at the root
1861                 if len(path) > 0:
1862                     path = path[:-1]
1863             elif p_utf8 != ".":
1864                 try:
1865                     p = p_utf8.decode('utf-8', 'strict')
1866                 except UnicodeError:
1867                     raise SFTPError(FX_NO_SUCH_FILE, "path could not be decoded as UTF-8")
1868                 path.append(p)
1869
1870         if noisy: self.log(" PATH %r" % (path,), level=NOISY)
1871         return path
1872
1873     def _get_root(self, path):
1874         # return Deferred (root, remaining_path)
1875         d = defer.succeed(None)
1876         if path and path[0] == u"uri":
1877             d.addCallback(lambda ign: self._client.create_node_from_uri(path[1].encode('utf-8')))
1878             d.addCallback(lambda root: (root, path[2:]))
1879         else:
1880             d.addCallback(lambda ign: (self._root, path))
1881         return d
1882
1883     def _get_parent_or_node(self, path):
1884         # return Deferred (parent, childname) or (node, None)
1885         d = self._get_root(path)
1886         def _got_root( (root, remaining_path) ):
1887             if not remaining_path:
1888                 return (root, None)
1889             else:
1890                 d2 = root.get_child_at_path(remaining_path[:-1])
1891                 d2.addCallback(lambda parent: (parent, remaining_path[-1]))
1892                 return d2
1893         d.addCallback(_got_root)
1894         return d
1895
1896
1897 class FakeTransport:
1898     implements(ITransport)
1899     def write(self, data):
1900         logmsg("FakeTransport.write(<data of length %r>)" % (len(data),), level=NOISY)
1901
1902     def writeSequence(self, data):
1903         logmsg("FakeTransport.writeSequence(...)", level=NOISY)
1904
1905     def loseConnection(self):
1906         logmsg("FakeTransport.loseConnection()", level=NOISY)
1907
1908     # getPeer and getHost can just raise errors, since we don't know what to return
1909
1910
1911 class ShellSession(PrefixingLogMixin):
1912     implements(ISession)
1913     def __init__(self, userHandler):
1914         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
1915         if noisy: self.log(".__init__(%r)" % (userHandler), level=NOISY)
1916
1917     def getPty(self, terminal, windowSize, attrs):
1918         self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
1919
1920     def openShell(self, protocol):
1921         self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
1922         if hasattr(protocol, 'transport') and protocol.transport is None:
1923             protocol.transport = FakeTransport()  # work around Twisted bug
1924
1925         return self._unsupported(protocol)
1926
1927     def execCommand(self, protocol, cmd):
1928         self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
1929         if hasattr(protocol, 'transport') and protocol.transport is None:
1930             protocol.transport = FakeTransport()  # work around Twisted bug
1931
1932         d = defer.succeed(None)
1933         if cmd == "df -P -k /":
1934             d.addCallback(lambda ign: protocol.write(
1935                           "Filesystem         1024-blocks      Used Available Capacity Mounted on\r\n"
1936                           "tahoe                628318530 314159265 314159265      50% /\r\n"))
1937             d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessDone(None))))
1938         else:
1939             d.addCallback(lambda ign: self._unsupported(protocol))
1940         return d
1941
1942     def _unsupported(self, protocol):
1943         d = defer.succeed(None)
1944         d.addCallback(lambda ign: protocol.errReceived(
1945                       "This server supports only the SFTP protocol. It does not support SCP,\r\n"
1946                       "interactive shell sessions, or commands other than one needed by sshfs.\r\n"))
1947         d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
1948         return d
1949
1950     def windowChanged(self, newWindowSize):
1951         self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
1952
1953     def eofReceived(self):
1954         self.log(".eofReceived()", level=OPERATIONAL)
1955
1956     def closed(self):
1957         self.log(".closed()", level=OPERATIONAL)
1958
1959
1960 # If you have an SFTPUserHandler and want something that provides ISession, you get
1961 # ShellSession(userHandler).
1962 # We use adaptation because this must be a different object to the SFTPUserHandler.
1963 components.registerAdapter(ShellSession, SFTPUserHandler, ISession)
1964
1965
1966 from allmydata.frontends.auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
1967
1968 class Dispatcher:
1969     implements(portal.IRealm)
1970     def __init__(self, client):
1971         self._client = client
1972
1973     def requestAvatar(self, avatarID, mind, interface):
1974         _assert(interface == IConchUser, interface=interface)
1975         rootnode = self._client.create_node_from_uri(avatarID.rootcap)
1976         handler = SFTPUserHandler(self._client, rootnode, avatarID.username)
1977         return (interface, handler, handler.logout)
1978
1979
1980 class SFTPServer(service.MultiService):
1981     def __init__(self, client, accountfile, accounturl,
1982                  sftp_portstr, pubkey_file, privkey_file):
1983         precondition(isinstance(accountfile, (unicode, NoneType)), accountfile)
1984         precondition(isinstance(pubkey_file, unicode), pubkey_file)
1985         precondition(isinstance(privkey_file, unicode), privkey_file)
1986         service.MultiService.__init__(self)
1987
1988         r = Dispatcher(client)
1989         p = portal.Portal(r)
1990
1991         if accountfile:
1992             c = AccountFileChecker(self, accountfile)
1993             p.registerChecker(c)
1994         if accounturl:
1995             c = AccountURLChecker(self, accounturl)
1996             p.registerChecker(c)
1997         if not accountfile and not accounturl:
1998             # we could leave this anonymous, with just the /uri/CAP form
1999             raise NeedRootcapLookupScheme("must provide an account file or URL")
2000
2001         pubkey = keys.Key.fromFile(pubkey_file.encode(get_filesystem_encoding()))
2002         privkey = keys.Key.fromFile(privkey_file.encode(get_filesystem_encoding()))
2003         class SSHFactory(factory.SSHFactory):
2004             publicKeys = {pubkey.sshType(): pubkey}
2005             privateKeys = {privkey.sshType(): privkey}
2006             def getPrimes(self):
2007                 try:
2008                     # if present, this enables diffie-hellman-group-exchange
2009                     return primes.parseModuliFile("/etc/ssh/moduli")
2010                 except IOError:
2011                     return None
2012
2013         f = SSHFactory()
2014         f.portal = p
2015
2016         s = strports.service(sftp_portstr, f)
2017         s.setServiceParent(self)