]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/sftpd.py
SFTP: further improve test coverage (paths containing '.', bad data for posix-rename...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / sftpd.py
1
2 import os, tempfile, heapq, binascii, traceback, array, stat, struct
3 from types import NoneType
4 from stat import S_IFREG, S_IFDIR
5 from time import time, strftime, localtime
6
7 from zope.interface import implements
8 from twisted.python import components
9 from twisted.application import service, strports
10 from twisted.conch.ssh import factory, keys, session
11 from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
12      FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
13      FX_BAD_MESSAGE, FX_FAILURE, FX_OK
14 from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
15      FXF_CREAT, FXF_TRUNC, FXF_EXCL
16 from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser, ISession
17 from twisted.conch.avatar import ConchUser
18 from twisted.conch.openssh_compat import primes
19 from twisted.cred import portal
20 from twisted.internet.error import ProcessDone, ProcessTerminated
21 from twisted.python.failure import Failure
22 from twisted.internet.interfaces import ITransport
23
24 from twisted.internet import defer
25 from twisted.internet.interfaces import IFinishableConsumer
26 from foolscap.api import eventually
27 from allmydata.util import deferredutil
28
29 from allmydata.util.consumer import download_to_data
30 from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
31      NoSuchChildError, ChildOfWrongTypeError
32 from allmydata.mutable.common import NotWriteableError
33 from allmydata.immutable.upload import FileHandle
34 from allmydata.dirnode import update_metadata
35
36 from pycryptopp.cipher.aes import AES
37
38 noisy = True
39 use_foolscap_logging = True
40
41 from allmydata.util.log import NOISY, OPERATIONAL, WEIRD, \
42     msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
43
44 if use_foolscap_logging:
45     (logmsg, logerr, PrefixingLogMixin) = (_msg, _err, _PrefixingLogMixin)
46 else:  # pragma: no cover
47     def logmsg(s, level=None):
48         print s
49     def logerr(s, level=None):
50         print s
51     class PrefixingLogMixin:
52         def __init__(self, facility=None, prefix=''):
53             self.prefix = prefix
54         def log(self, s, level=None):
55             print "%r %s" % (self.prefix, s)
56
57
58 def eventually_callback(d):
59     return lambda res: eventually(d.callback, res)
60
61 def eventually_errback(d):
62     return lambda err: eventually(d.errback, err)
63
64
65 def _utf8(x):
66     if isinstance(x, unicode):
67         return x.encode('utf-8')
68     if isinstance(x, str):
69         return x
70     return repr(x)
71
72
73 def _to_sftp_time(t):
74     """SFTP times are unsigned 32-bit integers representing UTC seconds
75     (ignoring leap seconds) since the Unix epoch, January 1 1970 00:00 UTC.
76     A Tahoe time is the corresponding float."""
77     return long(t) & 0xFFFFFFFFL
78
79
80 def _convert_error(res, request):
81     if not isinstance(res, Failure):
82         logged_res = res
83         if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
84         logmsg("SUCCESS %r %r" % (request, logged_res,), level=OPERATIONAL)
85         return res
86
87     err = res
88     logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
89     try:
90         if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
91     except:  # pragma: no cover
92         pass
93
94     # The message argument to SFTPError must not reveal information that
95     # might compromise anonymity.
96
97     if err.check(SFTPError):
98         # original raiser of SFTPError has responsibility to ensure anonymity
99         raise err
100     if err.check(NoSuchChildError):
101         childname = _utf8(err.value.args[0])
102         raise SFTPError(FX_NO_SUCH_FILE, childname)
103     if err.check(NotWriteableError) or err.check(ChildOfWrongTypeError):
104         msg = _utf8(err.value.args[0])
105         raise SFTPError(FX_PERMISSION_DENIED, msg)
106     if err.check(ExistingChildError):
107         # Versions of SFTP after v3 (which is what twisted.conch implements)
108         # define a specific error code for this case: FX_FILE_ALREADY_EXISTS.
109         # However v3 doesn't; instead, other servers such as sshd return
110         # FX_FAILURE. The gvfs SFTP backend, for example, depends on this
111         # to translate the error to the equivalent of POSIX EEXIST, which is
112         # necessary for some picky programs (such as gedit).
113         msg = _utf8(err.value.args[0])
114         raise SFTPError(FX_FAILURE, msg)
115     if err.check(NotImplementedError):
116         raise SFTPError(FX_OP_UNSUPPORTED, _utf8(err.value))
117     if err.check(EOFError):
118         raise SFTPError(FX_EOF, "end of file reached")
119     if err.check(defer.FirstError):
120         _convert_error(err.value.subFailure, request)
121
122     # We assume that the error message is not anonymity-sensitive.
123     raise SFTPError(FX_FAILURE, _utf8(err.value))
124
125
126 def _repr_flags(flags):
127     return "|".join([f for f in
128                      [(flags & FXF_READ)   and "FXF_READ"   or None,
129                       (flags & FXF_WRITE)  and "FXF_WRITE"  or None,
130                       (flags & FXF_APPEND) and "FXF_APPEND" or None,
131                       (flags & FXF_CREAT)  and "FXF_CREAT"  or None,
132                       (flags & FXF_TRUNC)  and "FXF_TRUNC"  or None,
133                       (flags & FXF_EXCL)   and "FXF_EXCL"   or None,
134                      ]
135                      if f])
136
137
138 def _lsLine(name, attrs):
139     st_uid = "tahoe"
140     st_gid = "tahoe"
141     st_mtime = attrs.get("mtime", 0)
142     st_mode = attrs["permissions"]
143     # TODO: check that clients are okay with this being a "?".
144     # (They should be because the longname is intended for human
145     # consumption.)
146     st_size = attrs.get("size", "?")
147     # We don't know how many links there really are to this object.
148     st_nlink = 1
149
150     # Based on <http://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
151     # We can't call the version in Twisted because we might have a version earlier than
152     # <http://twistedmatrix.com/trac/changeset/25412> (released in Twisted 8.2).
153
154     mode = st_mode
155     perms = array.array('c', '-'*10)
156     ft = stat.S_IFMT(mode)
157     if   stat.S_ISDIR(ft): perms[0] = 'd'
158     elif stat.S_ISREG(ft): perms[0] = '-'
159     else: perms[0] = '?'
160     # user
161     if mode&stat.S_IRUSR: perms[1] = 'r'
162     if mode&stat.S_IWUSR: perms[2] = 'w'
163     if mode&stat.S_IXUSR: perms[3] = 'x'
164     # group
165     if mode&stat.S_IRGRP: perms[4] = 'r'
166     if mode&stat.S_IWGRP: perms[5] = 'w'
167     if mode&stat.S_IXGRP: perms[6] = 'x'
168     # other
169     if mode&stat.S_IROTH: perms[7] = 'r'
170     if mode&stat.S_IWOTH: perms[8] = 'w'
171     if mode&stat.S_IXOTH: perms[9] = 'x'
172     # suid/sgid never set
173
174     l = perms.tostring()
175     l += str(st_nlink).rjust(5) + ' '
176     un = str(st_uid)
177     l += un.ljust(9)
178     gr = str(st_gid)
179     l += gr.ljust(9)
180     sz = str(st_size)
181     l += sz.rjust(8)
182     l += ' '
183     day = 60 * 60 * 24
184     sixmo = day * 7 * 26
185     now = time()
186     if st_mtime + sixmo < now or st_mtime > now + day:
187         # mtime is more than 6 months ago, or more than one day in the future
188         l += strftime("%b %d  %Y ", localtime(st_mtime))
189     else:
190         l += strftime("%b %d %H:%M ", localtime(st_mtime))
191     l += name
192     return l
193
194
195 def _no_write(parent_readonly, child, metadata=None):
196     """Whether child should be listed as having read-only permissions in parent."""
197
198     if child.is_unknown():
199         return True
200     elif child.is_mutable():
201         return child.is_readonly()
202     elif parent_readonly or IDirectoryNode.providedBy(child):
203         return True
204     else:
205         return metadata is not None and metadata.get('no-write', False)
206
207
208 def _populate_attrs(childnode, metadata, size=None):
209     attrs = {}
210
211     # The permissions must have the S_IFDIR (040000) or S_IFREG (0100000)
212     # bits, otherwise the client may refuse to open a directory.
213     # Also, sshfs run as a non-root user requires files and directories
214     # to be world-readable/writeable.
215     # It is important that we never set the executable bits on files.
216     #
217     # Directories and unknown nodes have no size, and SFTP doesn't
218     # require us to make one up.
219     #
220     # childnode might be None, meaning that the file doesn't exist yet,
221     # but we're going to write it later.
222
223     if childnode and childnode.is_unknown():
224         perms = 0
225     elif childnode and IDirectoryNode.providedBy(childnode):
226         perms = S_IFDIR | 0777
227     else:
228         # For files, omit the size if we don't immediately know it.
229         if childnode and size is None:
230             size = childnode.get_size()
231         if size is not None:
232             assert isinstance(size, (int, long)) and not isinstance(size, bool), repr(size)
233             attrs['size'] = size
234         perms = S_IFREG | 0666
235
236     if metadata:
237         if metadata.get('no-write', False):
238             perms &= S_IFDIR | S_IFREG | 0555  # clear 'w' bits
239
240         # See webapi.txt for what these times mean.
241         # We would prefer to omit atime, but SFTP version 3 can only
242         # accept mtime if atime is also set.
243         if 'linkmotime' in metadata.get('tahoe', {}):
244             attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
245         elif 'mtime' in metadata:
246             attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['mtime'])
247
248         if 'linkcrtime' in metadata.get('tahoe', {}):
249             attrs['createtime'] = _to_sftp_time(metadata['tahoe']['linkcrtime'])
250
251         if 'ctime' in metadata:
252             attrs['ctime'] = _to_sftp_time(metadata['ctime'])
253
254     attrs['permissions'] = perms
255
256     # twisted.conch.ssh.filetransfer only implements SFTP version 3,
257     # which doesn't include SSH_FILEXFER_ATTR_FLAGS.
258
259     return attrs
260
261
262 def _attrs_to_metadata(attrs):
263     metadata = {}
264
265     for key in attrs:
266         if key == "mtime" or key == "ctime" or key == "createtime":
267             metadata[key] = long(attrs[key])
268         elif key.startswith("ext_"):
269             metadata[key] = str(attrs[key])
270
271     perms = attrs.get('permissions', stat.S_IWUSR)
272     if not (perms & stat.S_IWUSR):
273         metadata['no-write'] = True
274
275     return metadata
276
277
278 def _direntry_for(filenode_or_parent, childname, filenode=None):
279     assert isinstance(childname, (unicode, NoneType)), childname
280
281     if childname is None:
282         filenode_or_parent = filenode
283
284     if filenode_or_parent:
285         rw_uri = filenode_or_parent.get_write_uri()
286         if rw_uri and childname:
287             return rw_uri + "/" + childname.encode('utf-8')
288         else:
289             return rw_uri
290
291     return None
292
293
294 class EncryptedTemporaryFile(PrefixingLogMixin):
295     # not implemented: next, readline, readlines, xreadlines, writelines
296
297     def __init__(self):
298         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
299         self.file = tempfile.TemporaryFile()
300         self.key = os.urandom(16)  # AES-128
301
302     def _crypt(self, offset, data):
303         # TODO: use random-access AES (pycryptopp ticket #18)
304         offset_big = offset // 16
305         offset_small = offset % 16
306         iv = binascii.unhexlify("%032x" % offset_big)
307         cipher = AES(self.key, iv=iv)
308         cipher.process("\x00"*offset_small)
309         return cipher.process(data)
310
311     def close(self):
312         self.file.close()
313
314     def flush(self):
315         self.file.flush()
316
317     def seek(self, offset, whence=0):  # 0 = SEEK_SET
318         if noisy: self.log(".seek(%r, %r)" % (offset, whence), level=NOISY)
319         self.file.seek(offset, whence)
320
321     def tell(self):
322         offset = self.file.tell()
323         if noisy: self.log(".tell() = %r" % (offset,), level=NOISY)
324         return offset
325
326     def read(self, size=-1):
327         if noisy: self.log(".read(%r)" % (size,), level=NOISY)
328         index = self.file.tell()
329         ciphertext = self.file.read(size)
330         plaintext = self._crypt(index, ciphertext)
331         return plaintext
332
333     def write(self, plaintext):
334         if noisy: self.log(".write(<data of length %r>)" % (len(plaintext),), level=NOISY)
335         index = self.file.tell()
336         ciphertext = self._crypt(index, plaintext)
337         self.file.write(ciphertext)
338
339     def truncate(self, newsize):
340         if noisy: self.log(".truncate(%r)" % (newsize,), level=NOISY)
341         self.file.truncate(newsize)
342
343
344 class OverwriteableFileConsumer(PrefixingLogMixin):
345     implements(IFinishableConsumer)
346     """I act both as a consumer for the download of the original file contents, and as a
347     wrapper for a temporary file that records the downloaded data and any overwrites.
348     I use a priority queue to keep track of which regions of the file have been overwritten
349     but not yet downloaded, so that the download does not clobber overwritten data.
350     I use another priority queue to record milestones at which to make callbacks
351     indicating that a given number of bytes have been downloaded.
352
353     The temporary file reflects the contents of the file that I represent, except that:
354      - regions that have neither been downloaded nor overwritten, if present,
355        contain garbage.
356      - the temporary file may be shorter than the represented file (it is never longer).
357        The latter's current size is stored in self.current_size.
358
359     This abstraction is mostly independent of SFTP. Consider moving it, if it is found
360     useful for other frontends."""
361
362     def __init__(self, download_size, tempfile_maker):
363         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
364         if noisy: self.log(".__init__(%r, %r)" % (download_size, tempfile_maker), level=NOISY)
365         self.download_size = download_size
366         self.current_size = download_size
367         self.f = tempfile_maker()
368         self.downloaded = 0
369         self.milestones = []  # empty heap of (offset, d)
370         self.overwrites = []  # empty heap of (start, end)
371         self.is_closed = False
372         self.done = self.when_reached(download_size)  # adds a milestone
373         self.is_done = False
374         def _signal_done(ign):
375             if noisy: self.log("DONE", level=NOISY)
376             self.is_done = True
377         self.done.addCallback(_signal_done)
378         self.producer = None
379
380     def get_file(self):
381         return self.f
382
383     def get_current_size(self):
384         return self.current_size
385
386     def set_current_size(self, size):
387         if noisy: self.log(".set_current_size(%r), current_size = %r, downloaded = %r" %
388                            (size, self.current_size, self.downloaded), level=NOISY)
389         if size < self.current_size or size < self.downloaded:
390             self.f.truncate(size)
391         if size > self.current_size:
392             self.overwrite(self.current_size, "\x00" * (size - self.current_size))
393         self.current_size = size
394
395         # invariant: self.download_size <= self.current_size
396         if size < self.download_size:
397             self.download_size = size
398         if self.downloaded >= self.download_size:
399             self.finish()
400
401     def registerProducer(self, p, streaming):
402         if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
403         self.producer = p
404         if streaming:
405             # call resumeProducing once to start things off
406             p.resumeProducing()
407         else:
408             def _iterate():
409                 if not self.is_done:
410                     p.resumeProducing()
411                     eventually(_iterate)
412             _iterate()
413
414     def write(self, data):
415         if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
416         if self.is_closed:
417             return
418
419         if self.downloaded >= self.download_size:
420             return
421
422         next_downloaded = self.downloaded + len(data)
423         if next_downloaded > self.download_size:
424             data = data[:(self.download_size - self.downloaded)]
425
426         while len(self.overwrites) > 0:
427             (start, end) = self.overwrites[0]
428             if start >= next_downloaded:
429                 # This and all remaining overwrites are after the data we just downloaded.
430                 break
431             if start > self.downloaded:
432                 # The data we just downloaded has been partially overwritten.
433                 # Write the prefix of it that precedes the overwritten region.
434                 self.f.seek(self.downloaded)
435                 self.f.write(data[:(start - self.downloaded)])
436
437             # This merges consecutive overwrites if possible, which allows us to detect the
438             # case where the download can be stopped early because the remaining region
439             # to download has already been fully overwritten.
440             heapq.heappop(self.overwrites)
441             while len(self.overwrites) > 0:
442                 (start1, end1) = self.overwrites[0]
443                 if start1 > end:
444                     break
445                 end = end1
446                 heapq.heappop(self.overwrites)
447
448             if end >= next_downloaded:
449                 # This overwrite extends past the downloaded data, so there is no
450                 # more data to consider on this call.
451                 heapq.heappush(self.overwrites, (next_downloaded, end))
452                 self._update_downloaded(next_downloaded)
453                 return
454             elif end >= self.downloaded:
455                 data = data[(end - self.downloaded):]
456                 self._update_downloaded(end)
457
458         self.f.seek(self.downloaded)
459         self.f.write(data)
460         self._update_downloaded(next_downloaded)
461
462     def _update_downloaded(self, new_downloaded):
463         self.downloaded = new_downloaded
464         milestone = new_downloaded
465         if len(self.overwrites) > 0:
466             (start, end) = self.overwrites[0]
467             if start <= new_downloaded and end > milestone:
468                 milestone = end
469
470         while len(self.milestones) > 0:
471             (next, d) = self.milestones[0]
472             if next > milestone:
473                 return
474             if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
475             heapq.heappop(self.milestones)
476             eventually_callback(d)(None)
477
478         if milestone >= self.download_size:
479             self.finish()
480
481     def overwrite(self, offset, data):
482         if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
483         if offset > self.current_size:
484             # Normally writing at an offset beyond the current end-of-file
485             # would leave a hole that appears filled with zeroes. However, an
486             # EncryptedTemporaryFile doesn't behave like that (if there is a
487             # hole in the file on disk, the zeroes that are read back will be
488             # XORed with the keystream). So we must explicitly write zeroes in
489             # the gap between the current EOF and the offset.
490
491             self.f.seek(self.current_size)
492             self.f.write("\x00" * (offset - self.current_size))
493             start = self.current_size
494         else:
495             self.f.seek(offset)
496             start = offset
497
498         self.f.write(data)
499         end = offset + len(data)
500         self.current_size = max(self.current_size, end)
501         if end > self.downloaded:
502             heapq.heappush(self.overwrites, (start, end))
503
504     def read(self, offset, length):
505         """When the data has been read, callback the Deferred that we return with this data.
506         Otherwise errback the Deferred that we return.
507         The caller must perform no more overwrites until the Deferred has fired."""
508
509         if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
510         if offset >= self.current_size:
511             def _eof(): raise EOFError("read past end of file")
512             return defer.execute(_eof)
513
514         if offset + length > self.current_size:
515             length = self.current_size - offset
516             if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
517
518         needed = min(offset + length, self.download_size)
519         d = self.when_reached(needed)
520         def _reached(ign):
521             # It is not necessarily the case that self.downloaded >= needed, because
522             # the file might have been truncated (thus truncating the download) and
523             # then extended.
524
525             assert self.current_size >= offset + length, (self.current_size, offset, length)
526             if noisy: self.log("self.f = %r" % (self.f,), level=NOISY)
527             self.f.seek(offset)
528             return self.f.read(length)
529         d.addCallback(_reached)
530         return d
531
532     def when_reached(self, index):
533         if noisy: self.log(".when_reached(%r)" % (index,), level=NOISY)
534         if index <= self.downloaded:  # already reached
535             if noisy: self.log("already reached %r" % (index,), level=NOISY)
536             return defer.succeed(None)
537         d = defer.Deferred()
538         def _reached(ign):
539             if noisy: self.log("reached %r" % (index,), level=NOISY)
540             return ign
541         d.addCallback(_reached)
542         heapq.heappush(self.milestones, (index, d))
543         return d
544
545     def when_done(self):
546         return self.done
547
548     def finish(self):
549         while len(self.milestones) > 0:
550             (next, d) = self.milestones[0]
551             if noisy: self.log("MILESTONE FINISH %r %r" % (next, d), level=NOISY)
552             heapq.heappop(self.milestones)
553             # The callback means that the milestone has been reached if
554             # it is ever going to be. Note that the file may have been
555             # truncated to before the milestone.
556             eventually_callback(d)(None)
557
558         # FIXME: causes spurious failures
559         #self.unregisterProducer()
560
561     def close(self):
562         if not self.is_closed:
563             self.is_closed = True
564             try:
565                 self.f.close()
566             except BaseException, e:
567                 self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
568         self.finish()
569
570     def unregisterProducer(self):
571         if self.producer:
572             self.producer.stopProducing()
573             self.producer = None
574
575
576 SIZE_THRESHOLD = 1000
577
578
579 class ShortReadOnlySFTPFile(PrefixingLogMixin):
580     implements(ISFTPFile)
581     """I represent a file handle to a particular file on an SFTP connection.
582     I am used only for short immutable files opened in read-only mode.
583     The file contents are downloaded to memory when I am created."""
584
585     def __init__(self, userpath, filenode, metadata):
586         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
587         if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
588
589         assert isinstance(userpath, str) and IFileNode.providedBy(filenode), (userpath, filenode)
590         self.filenode = filenode
591         self.metadata = metadata
592         self.async = download_to_data(filenode)
593         self.closed = False
594
595     def readChunk(self, offset, length):
596         request = ".readChunk(%r, %r)" % (offset, length)
597         self.log(request, level=OPERATIONAL)
598
599         if self.closed:
600             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
601             return defer.execute(_closed)
602
603         d = defer.Deferred()
604         def _read(data):
605             if noisy: self.log("_read(<data of length %r>) in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY)
606
607             # "In response to this request, the server will read as many bytes as it
608             #  can from the file (up to 'len'), and return them in a SSH_FXP_DATA
609             #  message.  If an error occurs or EOF is encountered before reading any
610             #  data, the server will respond with SSH_FXP_STATUS.  For normal disk
611             #  files, it is guaranteed that this will read the specified number of
612             #  bytes, or up to end of file."
613             #
614             # i.e. we respond with an EOF error iff offset is already at EOF.
615
616             if offset >= len(data):
617                 eventually_errback(d)(SFTPError(FX_EOF, "read at or past end of file"))
618             else:
619                 eventually_callback(d)(data[offset:min(offset+length, len(data))])
620             return data
621         self.async.addCallbacks(_read, eventually_errback(d))
622         d.addBoth(_convert_error, request)
623         return d
624
625     def writeChunk(self, offset, data):
626         self.log(".writeChunk(%r, <data of length %r>) denied" % (offset, len(data)), level=OPERATIONAL)
627
628         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
629         return defer.execute(_denied)
630
631     def close(self):
632         self.log(".close()", level=OPERATIONAL)
633
634         self.closed = True
635         return defer.succeed(None)
636
637     def getAttrs(self):
638         request = ".getAttrs()"
639         self.log(request, level=OPERATIONAL)
640
641         if self.closed:
642             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
643             return defer.execute(_closed)
644
645         d = defer.execute(_populate_attrs, self.filenode, self.metadata)
646         d.addBoth(_convert_error, request)
647         return d
648
649     def setAttrs(self, attrs):
650         self.log(".setAttrs(%r) denied" % (attrs,), level=OPERATIONAL)
651         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
652         return defer.execute(_denied)
653
654
655 class GeneralSFTPFile(PrefixingLogMixin):
656     implements(ISFTPFile)
657     """I represent a file handle to a particular file on an SFTP connection.
658     I wrap an instance of OverwriteableFileConsumer, which is responsible for
659     storing the file contents. In order to allow write requests to be satisfied
660     immediately, there is effectively a FIFO queue between requests made to this
661     file handle, and requests to my OverwriteableFileConsumer. This queue is
662     implemented by the callback chain of self.async.
663
664     When first constructed, I am in an 'unopened' state that causes most
665     operations to be delayed until 'open' is called."""
666
667     def __init__(self, userpath, flags, close_notify, convergence):
668         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
669         if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
670                            (userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
671
672         assert isinstance(userpath, str), userpath
673         self.userpath = userpath
674         self.flags = flags
675         self.close_notify = close_notify
676         self.convergence = convergence
677         self.async = defer.Deferred()
678         # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
679         self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
680         self.closed = False
681         self.abandoned = False
682         self.parent = None
683         self.childname = None
684         self.filenode = None
685         self.metadata = None
686
687         # self.consumer should only be relied on in callbacks for self.async, since it might
688         # not be set before then.
689         self.consumer = None
690
691     def open(self, parent=None, childname=None, filenode=None, metadata=None):
692         self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
693                  (parent, childname, filenode, metadata), level=OPERATIONAL)
694
695         assert isinstance(childname, (unicode, NoneType)), childname
696         # If the file has been renamed, the new (parent, childname) takes precedence.
697         if self.parent is None:
698             self.parent = parent
699         if self.childname is None:
700             self.childname = childname
701         self.filenode = filenode
702         self.metadata = metadata
703
704         assert not self.closed, self
705         tempfile_maker = EncryptedTemporaryFile
706
707         if (self.flags & FXF_TRUNC) or not filenode:
708             # We're either truncating or creating the file, so we don't need the old contents.
709             self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
710             self.consumer.finish()
711         else:
712             assert IFileNode.providedBy(filenode), filenode
713
714             # TODO: use download interface described in #993 when implemented.
715             if filenode.is_mutable():
716                 self.async.addCallback(lambda ign: filenode.download_best_version())
717                 def _downloaded(data):
718                     self.consumer = OverwriteableFileConsumer(len(data), tempfile_maker)
719                     self.consumer.write(data)
720                     self.consumer.finish()
721                     return None
722                 self.async.addCallback(_downloaded)
723             else:
724                 download_size = filenode.get_size()
725                 assert download_size is not None, "download_size is None"
726                 self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
727                 def _read(ign):
728                     if noisy: self.log("_read immutable", level=NOISY)
729                     filenode.read(self.consumer, 0, None)
730                 self.async.addCallback(_read)
731
732         eventually_callback(self.async)(None)
733
734         if noisy: self.log("open done", level=NOISY)
735         return self
736
737     def get_userpath(self):
738         return self.userpath
739
740     def get_direntry(self):
741         return _direntry_for(self.parent, self.childname)
742
743     def rename(self, new_userpath, new_parent, new_childname):
744         self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
745
746         assert isinstance(new_userpath, str) and isinstance(new_childname, unicode), (new_userpath, new_childname)
747         self.userpath = new_userpath
748         self.parent = new_parent
749         self.childname = new_childname
750
751     def abandon(self):
752         self.log(".abandon()", level=OPERATIONAL)
753
754         self.abandoned = True
755
756     def sync(self, ign=None):
757         # The ign argument allows some_file.sync to be used as a callback.
758         self.log(".sync()", level=OPERATIONAL)
759
760         d = defer.Deferred()
761         self.async.addBoth(eventually_callback(d))
762         def _done(res):
763             if noisy: self.log("_done(%r) in .sync()" % (res,), level=NOISY)
764             return res
765         d.addBoth(_done)
766         return d
767
768     def get_metadata(self):
769         return self.metadata
770
771     def readChunk(self, offset, length):
772         request = ".readChunk(%r, %r)" % (offset, length)
773         self.log(request, level=OPERATIONAL)
774
775         if not (self.flags & FXF_READ):
776             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for reading")
777             return defer.execute(_denied)
778
779         if self.closed:
780             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
781             return defer.execute(_closed)
782
783         d = defer.Deferred()
784         def _read(ign):
785             if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
786             d2 = self.consumer.read(offset, length)
787             d2.addCallbacks(eventually_callback(d), eventually_errback(d))
788             # It is correct to drop d2 here.
789             return None
790         self.async.addCallbacks(_read, eventually_errback(d))
791         d.addBoth(_convert_error, request)
792         return d
793
794     def writeChunk(self, offset, data):
795         self.log(".writeChunk(%r, <data of length %r>)" % (offset, len(data)), level=OPERATIONAL)
796
797         if not (self.flags & FXF_WRITE):
798             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
799             return defer.execute(_denied)
800
801         if self.closed:
802             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
803             return defer.execute(_closed)
804
805         self.has_changed = True
806
807         # Note that we return without waiting for the write to occur. Reads and
808         # close wait for prior writes, and will fail if any prior operation failed.
809         # This is ok because SFTP makes no guarantee that the write completes
810         # before the request does. In fact it explicitly allows write errors to be
811         # delayed until close:
812         #   "One should note that on some server platforms even a close can fail.
813         #    This can happen e.g. if the server operating system caches writes,
814         #    and an error occurs while flushing cached writes during the close."
815
816         def _write(ign):
817             if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
818                                (offset, len(data), self.consumer.get_current_size()), level=NOISY)
819             # FXF_APPEND means that we should always write at the current end of file.
820             write_offset = offset
821             if self.flags & FXF_APPEND:
822                 write_offset = self.consumer.get_current_size()
823
824             self.consumer.overwrite(write_offset, data)
825             if noisy: self.log("overwrite done", level=NOISY)
826             return None
827         self.async.addCallback(_write)
828         # don't addErrback to self.async, just allow subsequent async ops to fail.
829         return defer.succeed(None)
830
831     def close(self):
832         request = ".close()"
833         self.log(request, level=OPERATIONAL)
834
835         if self.closed:
836             return defer.succeed(None)
837
838         # This means that close has been called, not that the close has succeeded.
839         self.closed = True
840
841         if not (self.flags & (FXF_WRITE | FXF_CREAT)):
842             def _readonly_close():
843                 if self.consumer:
844                     self.consumer.close()
845             return defer.execute(_readonly_close)
846
847         # We must capture the abandoned, parent, and childname variables synchronously
848         # at the close call. This is needed by the correctness arguments in the comments
849         # for _abandon_any_heisenfiles and _rename_heisenfiles.
850         # Note that the file must have been opened before it can be closed.
851         abandoned = self.abandoned
852         parent = self.parent
853         childname = self.childname
854         
855         # has_changed is set when writeChunk is called, not when the write occurs, so
856         # it is correct to optimize out the commit if it is False at the close call.
857         has_changed = self.has_changed
858
859         def _committed(res):
860             if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
861
862             self.consumer.close()
863
864             # We must close_notify before re-firing self.async.
865             if self.close_notify:
866                 self.close_notify(self.userpath, self.parent, self.childname, self)
867             return res
868
869         def _close(ign):
870             d2 = self.consumer.when_done()
871             if self.filenode and self.filenode.is_mutable():
872                 self.log("update mutable file %r childname=%r" % (self.filenode, childname), level=OPERATIONAL)
873                 if self.metadata.get('no-write', False) and not self.filenode.is_readonly():
874                     assert parent and childname, (parent, childname, self.metadata)
875                     d2.addCallback(lambda ign: parent.set_metadata_for(childname, self.metadata))
876
877                 d2.addCallback(lambda ign: self.consumer.get_current_size())
878                 d2.addCallback(lambda size: self.consumer.read(0, size))
879                 d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
880             else:
881                 def _add_file(ign):
882                     self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
883                     u = FileHandle(self.consumer.get_file(), self.convergence)
884                     return parent.add_file(childname, u, metadata=self.metadata)
885                 d2.addCallback(_add_file)
886
887             d2.addBoth(_committed)
888             return d2
889
890         d = defer.Deferred()
891
892         # If the file has been abandoned, we don't want the close operation to get "stuck",
893         # even if self.async fails to re-fire. Doing the close independently of self.async
894         # in that case ensures that dropping an ssh connection is sufficient to abandon
895         # any heisenfiles that were not explicitly closed in that connection.
896         if abandoned or not has_changed:
897             d.addCallback(_committed)
898         else:
899             self.async.addCallback(_close)
900
901         self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
902         d.addBoth(_convert_error, request)
903         return d
904
905     def getAttrs(self):
906         request = ".getAttrs()"
907         self.log(request, level=OPERATIONAL)
908
909         if self.closed:
910             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
911             return defer.execute(_closed)
912
913         # Optimization for read-only handles, when we already know the metadata.
914         if not (self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
915             return defer.succeed(_populate_attrs(self.filenode, self.metadata))
916
917         d = defer.Deferred()
918         def _get(ign):
919             if noisy: self.log("_get(%r) in %r, filenode = %r, metadata = %r" % (ign, request, self.filenode, self.metadata), level=NOISY)
920
921             # self.filenode might be None, but that's ok.
922             attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
923             eventually_callback(d)(attrs)
924             return None
925         self.async.addCallbacks(_get, eventually_errback(d))
926         d.addBoth(_convert_error, request)
927         return d
928
929     def setAttrs(self, attrs, only_if_at=None):
930         request = ".setAttrs(%r, only_if_at=%r)" % (attrs, only_if_at)
931         self.log(request, level=OPERATIONAL)
932
933         if not (self.flags & FXF_WRITE):
934             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
935             return defer.execute(_denied)
936
937         if self.closed:
938             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
939             return defer.execute(_closed)
940
941         size = attrs.get("size", None)
942         if size is not None and (not isinstance(size, (int, long)) or size < 0):
943             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
944             return defer.execute(_bad)
945
946         d = defer.Deferred()
947         def _set(ign):
948             if noisy: self.log("_set(%r) in %r" % (ign, request), level=NOISY)
949             if only_if_at and only_if_at != _direntry_for(self.parent, self.childname, self.filenode):
950                 return None
951
952             now = time()
953             self.metadata = update_metadata(self.metadata, _attrs_to_metadata(attrs), now)
954             if size is not None:
955                 # TODO: should we refuse to truncate a file opened with FXF_APPEND?
956                 # <http://allmydata.org/trac/tahoe-lafs/ticket/1037#comment:20>
957                 self.consumer.set_current_size(size)
958             eventually_callback(d)(None)
959             return None
960         self.async.addCallbacks(_set, eventually_errback(d))
961         d.addBoth(_convert_error, request)
962         return d
963
964
965 class StoppableList:
966     def __init__(self, items):
967         self.items = items
968     def __iter__(self):
969         for i in self.items:
970             yield i
971     def close(self):
972         pass
973
974
975 class Reason:
976     def __init__(self, value):
977         self.value = value
978
979
980 # A "heisenfile" is a file that has been opened with write flags
981 # (FXF_WRITE and/or FXF_CREAT) and not yet close-notified.
982 # 'all_heisenfiles' maps from a direntry string to a list of
983 # GeneralSFTPFile.
984 #
985 # A direntry string is parent_write_uri + "/" + childname_utf8 for
986 # an immutable file, or file_write_uri for a mutable file.
987 # Updates to this dict are single-threaded.
988
989 all_heisenfiles = {}
990
991
992 class SFTPUserHandler(ConchUser, PrefixingLogMixin):
993     implements(ISFTPServer)
994     def __init__(self, client, rootnode, username):
995         ConchUser.__init__(self)
996         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=username)
997         if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
998
999         self.channelLookup["session"] = session.SSHSession
1000         self.subsystemLookup["sftp"] = FileTransferServer
1001
1002         self._client = client
1003         self._root = rootnode
1004         self._username = username
1005         self._convergence = client.convergence
1006
1007         # maps from UTF-8 paths for this user, to files written and still open
1008         self._heisenfiles = {}
1009
1010     def gotVersion(self, otherVersion, extData):
1011         self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
1012
1013         # advertise the same extensions as the OpenSSH SFTP server
1014         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1015         return {'posix-rename@openssh.com': '1',
1016                 'statvfs@openssh.com': '2',
1017                 'fstatvfs@openssh.com': '2',
1018                }
1019
1020     def logout(self):
1021         self.log(".logout()", level=OPERATIONAL)
1022
1023         for files in self._heisenfiles.itervalues():
1024             for f in files:
1025                 f.abandon()
1026
1027     def _add_heisenfile_by_path(self, file):
1028         self.log("._add_heisenfile_by_path(%r)" % (file,), level=OPERATIONAL)
1029
1030         userpath = file.get_userpath()
1031         if userpath in self._heisenfiles:
1032             self._heisenfiles[userpath] += [file]
1033         else:
1034             self._heisenfiles[userpath] = [file]
1035
1036     def _add_heisenfile_by_direntry(self, file):
1037         self.log("._add_heisenfile_by_direntry(%r)" % (file,), level=OPERATIONAL)
1038
1039         direntry = file.get_direntry()
1040         if direntry:
1041             if direntry in all_heisenfiles:
1042                 all_heisenfiles[direntry] += [file]
1043             else:
1044                 all_heisenfiles[direntry] = [file]
1045
1046     def _abandon_any_heisenfiles(self, userpath, direntry):
1047         request = "._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry)
1048         self.log(request, level=OPERATIONAL)
1049         
1050         assert isinstance(userpath, str), userpath
1051
1052         # First we synchronously mark all heisenfiles matching the userpath or direntry
1053         # as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
1054         # each file that we abandoned.
1055         #
1056         # For each file, the call to .abandon() occurs:
1057         #   * before the file is closed, in which case it will never be committed
1058         #     (uploaded+linked or published); or
1059         #   * after it is closed but before it has been close_notified, in which case the
1060         #     .sync() ensures that it has been committed (successfully or not) before we
1061         #     return.
1062         #
1063         # This avoids a race that might otherwise cause the file to be committed after
1064         # the remove operation has completed.
1065         #
1066         # We return a Deferred that fires with True if any files were abandoned (this
1067         # does not mean that they were not committed; it is used to determine whether
1068         # a NoSuchChildError from the attempt to delete the file should be suppressed).
1069
1070         files = []
1071         if direntry in all_heisenfiles:
1072             files = all_heisenfiles[direntry]
1073             del all_heisenfiles[direntry]
1074         if userpath in self._heisenfiles:
1075             files += self._heisenfiles[userpath]
1076             del self._heisenfiles[userpath]
1077
1078         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1079
1080         for f in files:
1081             f.abandon()
1082
1083         d = defer.succeed(None)
1084         for f in files:
1085             d.addBoth(f.sync)
1086
1087         def _done(ign):
1088             self.log("done %r" % (request,), level=OPERATIONAL)
1089             return len(files) > 0
1090         d.addBoth(_done)
1091         return d
1092
1093     def _rename_heisenfiles(self, from_userpath, from_parent, from_childname,
1094                             to_userpath, to_parent, to_childname, overwrite=True):
1095         request = ("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" %
1096                    (from_userpath, from_parent, from_childname, to_userpath, to_parent, to_childname, overwrite))
1097         self.log(request, level=OPERATIONAL)
1098
1099         assert (isinstance(from_userpath, str) and isinstance(from_childname, unicode) and
1100                 isinstance(to_userpath, str) and isinstance(to_childname, unicode)), \
1101                (from_userpath, from_childname, to_userpath, to_childname)
1102
1103         if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1104
1105         # First we synchronously rename all heisenfiles matching the userpath or direntry.
1106         # Then we .sync() each file that we renamed.
1107         #
1108         # For each file, the call to .rename occurs:
1109         #   * before the file is closed, in which case it will be committed at the
1110         #     new direntry; or
1111         #   * after it is closed but before it has been close_notified, in which case the
1112         #     .sync() ensures that it has been committed (successfully or not) before we
1113         #     return.
1114         #
1115         # This avoids a race that might otherwise cause the file to be committed at the
1116         # old name after the rename operation has completed.
1117         #
1118         # Note that if overwrite is False, the caller should already have checked
1119         # whether a real direntry exists at the destination. It is possible that another
1120         # direntry (heisen or real) comes to exist at the destination after that check,
1121         # but in that case it is correct for the rename to succeed (and for the commit
1122         # of the heisenfile at the destination to possibly clobber the other entry, since
1123         # that can happen anyway when we have concurrent write handles to the same direntry).
1124         #
1125         # We return a Deferred that fires with True if any files were renamed (this
1126         # does not mean that they were not committed; it is used to determine whether
1127         # a NoSuchChildError from the rename attempt should be suppressed). If overwrite
1128         # is False and there were already heisenfiles at the destination userpath or
1129         # direntry, we return a Deferred that fails with SFTPError(FX_PERMISSION_DENIED).
1130
1131         from_direntry = _direntry_for(from_parent, from_childname)
1132         to_direntry = _direntry_for(to_parent, to_childname)
1133
1134         if noisy: self.log("from_direntry = %r, to_direntry = %r in %r" %
1135                            (from_direntry, to_direntry, request), level=NOISY)
1136
1137         if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
1138             def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1139             if noisy: self.log("existing", level=NOISY)
1140             return defer.execute(_existing)
1141
1142         from_files = []
1143         if from_direntry in all_heisenfiles:
1144             from_files = all_heisenfiles[from_direntry]
1145             del all_heisenfiles[from_direntry]
1146         if from_userpath in self._heisenfiles:
1147             from_files += self._heisenfiles[from_userpath]
1148             del self._heisenfiles[from_userpath]
1149
1150         if noisy: self.log("from_files = %r in %r" % (from_files, request), level=NOISY)
1151
1152         for f in from_files:
1153             f.rename(to_userpath, to_parent, to_childname)
1154             self._add_heisenfile_by_path(f)
1155             self._add_heisenfile_by_direntry(f)
1156
1157         d = defer.succeed(None)
1158         for f in from_files:
1159             d.addBoth(f.sync)
1160
1161         def _done(ign):
1162             if noisy:
1163                 self.log("done %r\nall_heisenfiles = %r\nself._heisenfiles = %r" % (request, all_heisenfiles, self._heisenfiles), level=OPERATIONAL)
1164             else:  # pragma: no cover
1165                 self.log("done %r" % (request,), level=OPERATIONAL)
1166             return len(from_files) > 0
1167         d.addBoth(_done)
1168         return d
1169
1170     def _update_attrs_for_heisenfiles(self, userpath, direntry, attrs):
1171         request = "._update_attrs_for_heisenfiles(%r, %r, %r)" % (userpath, direntry, attrs)
1172         self.log(request, level=OPERATIONAL)
1173
1174         assert isinstance(userpath, str) and isinstance(direntry, str), (userpath, direntry)
1175
1176         files = []
1177         if direntry in all_heisenfiles:
1178             files = all_heisenfiles[direntry]
1179         if userpath in self._heisenfiles:
1180             files += self._heisenfiles[userpath]
1181
1182         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1183
1184         # We set the metadata for all heisenfiles at this path or direntry.
1185         # Since a direntry includes a write URI, we must have authority to
1186         # change the metadata of heisenfiles found in the all_heisenfiles dict.
1187         # However that's not necessarily the case for heisenfiles found by
1188         # path. Therefore we tell the setAttrs method of each file to only
1189         # perform the update if the file is at the correct direntry.
1190
1191         d = defer.succeed(None)
1192         for f in files:
1193             d.addBoth(f.setAttrs, attrs, only_if_at=direntry)
1194
1195         def _done(ign):
1196             self.log("done %r" % (request,), level=OPERATIONAL)
1197             # TODO: this should not return True if only_if_at caused all files to be skipped.
1198             return len(files) > 0
1199         d.addBoth(_done)
1200         return d
1201
1202     def _sync_heisenfiles(self, userpath, direntry, ignore=None):
1203         request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
1204         self.log(request, level=OPERATIONAL)
1205
1206         assert isinstance(userpath, str) and isinstance(direntry, (str, NoneType)), (userpath, direntry)
1207
1208         files = []
1209         if direntry in all_heisenfiles:
1210             files = all_heisenfiles[direntry]
1211         if userpath in self._heisenfiles:
1212             files += self._heisenfiles[userpath]
1213
1214         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1215
1216         d = defer.succeed(None)
1217         for f in files:
1218             if f is not ignore:
1219                 d.addBoth(f.sync)
1220
1221         def _done(ign):
1222             self.log("done %r" % (request,), level=OPERATIONAL)
1223             return None
1224         d.addBoth(_done)
1225         return d
1226
1227     def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
1228         if noisy: self.log("._remove_heisenfile(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
1229
1230         assert isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)), (userpath, childname)
1231
1232         direntry = _direntry_for(parent, childname)
1233         if direntry in all_heisenfiles:
1234             all_old_files = all_heisenfiles[direntry]
1235             all_new_files = [f for f in all_old_files if f is not file_to_remove]
1236             if len(all_new_files) > 0:
1237                 all_heisenfiles[direntry] = all_new_files
1238             else:
1239                 del all_heisenfiles[direntry]
1240
1241         if userpath in self._heisenfiles:
1242             old_files = self._heisenfiles[userpath]
1243             new_files = [f for f in old_files if f is not file_to_remove]
1244             if len(new_files) > 0:
1245                 self._heisenfiles[userpath] = new_files
1246             else:
1247                 del self._heisenfiles[userpath]
1248
1249         if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1250
1251     def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
1252         if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
1253                            (existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
1254                            level=NOISY)
1255
1256         assert (isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)) and
1257                 (metadata is None or 'no-write' in metadata)), (userpath, childname, metadata)
1258
1259         writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
1260         direntry = _direntry_for(parent, childname, filenode)
1261
1262         d = self._sync_heisenfiles(userpath, direntry, ignore=existing_file)
1263
1264         if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
1265             d.addCallback(lambda ign: ShortReadOnlySFTPFile(userpath, filenode, metadata))
1266         else:
1267             close_notify = None
1268             if writing:
1269                 close_notify = self._remove_heisenfile
1270
1271             d.addCallback(lambda ign: existing_file or GeneralSFTPFile(userpath, flags, close_notify, self._convergence))
1272             def _got_file(file):
1273                 file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
1274                 if writing:
1275                     self._add_heisenfile_by_direntry(file)
1276                 return file
1277             d.addCallback(_got_file)
1278         return d
1279
1280     def openFile(self, pathstring, flags, attrs, delay=None):
1281         request = ".openFile(%r, %r = %r, %r, delay=%r)" % (pathstring, flags, _repr_flags(flags), attrs, delay)
1282         self.log(request, level=OPERATIONAL)
1283
1284         # This is used for both reading and writing.
1285         # First exclude invalid combinations of flags, and empty paths.
1286
1287         if not (flags & (FXF_READ | FXF_WRITE)):
1288             def _bad_readwrite():
1289                 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
1290             return defer.execute(_bad_readwrite)
1291
1292         if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
1293             def _bad_exclcreat():
1294                 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
1295             return defer.execute(_bad_exclcreat)
1296
1297         path = self._path_from_string(pathstring)
1298         if not path:
1299             def _emptypath(): raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
1300             return defer.execute(_emptypath)
1301
1302         # The combination of flags is potentially valid.
1303
1304         # To work around clients that have race condition bugs, a getAttr, rename, or
1305         # remove request following an 'open' request with FXF_WRITE or FXF_CREAT flags,
1306         # should succeed even if the 'open' request has not yet completed. So we now
1307         # synchronously add a file object into the self._heisenfiles dict, indexed
1308         # by its UTF-8 userpath. (We can't yet add it to the all_heisenfiles dict,
1309         # because we don't yet have a user-independent path for the file.) The file
1310         # object does not know its filenode, parent, or childname at this point.
1311
1312         userpath = self._path_to_utf8(path)
1313
1314         if flags & (FXF_WRITE | FXF_CREAT):
1315             file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
1316             self._add_heisenfile_by_path(file)
1317         else:
1318             # We haven't decided which file implementation to use yet.
1319             file = None
1320
1321         desired_metadata = _attrs_to_metadata(attrs)
1322
1323         # Now there are two major cases:
1324         #
1325         #  1. The path is specified as /uri/FILECAP, with no parent directory.
1326         #     If the FILECAP is mutable and writeable, then we can open it in write-only
1327         #     or read/write mode (non-exclusively), otherwise we can only open it in
1328         #     read-only mode. The open should succeed immediately as long as FILECAP is
1329         #     a valid known filecap that grants the required permission.
1330         #
1331         #  2. The path is specified relative to a parent. We find the parent dirnode and
1332         #     get the child's URI and metadata if it exists. There are four subcases:
1333         #       a. the child does not exist: FXF_CREAT must be set, and we must be able
1334         #          to write to the parent directory.
1335         #       b. the child exists but is not a valid known filecap: fail
1336         #       c. the child is mutable: if we are trying to open it write-only or
1337         #          read/write, then we must be able to write to the file.
1338         #       d. the child is immutable: if we are trying to open it write-only or
1339         #          read/write, then we must be able to write to the parent directory.
1340         #
1341         # To reduce latency, open normally succeeds as soon as these conditions are
1342         # met, even though there might be a failure in downloading the existing file
1343         # or uploading a new one. However, there is an exception: if a file has been
1344         # written, then closed, and is now being reopened, then we have to delay the
1345         # open until the previous upload/publish has completed. This is necessary
1346         # because sshfs does not wait for the result of an FXF_CLOSE message before
1347         # reporting to the client that a file has been closed. It applies both to
1348         # mutable files, and to directory entries linked to an immutable file.
1349         #
1350         # Note that the permission checks below are for more precise error reporting on
1351         # the open call; later operations would fail even if we did not make these checks.
1352
1353         d = delay or defer.succeed(None)
1354         d.addCallback(lambda ign: self._get_root(path))
1355         def _got_root( (root, path) ):
1356             if root.is_unknown():
1357                 raise SFTPError(FX_PERMISSION_DENIED,
1358                                 "cannot open an unknown cap (or child of an unknown object). "
1359                                 "Upgrading the gateway to a later Tahoe-LAFS version may help")
1360             if not path:
1361                 # case 1
1362                 if noisy: self.log("case 1: root = %r, path[:-1] = %r" % (root, path[:-1]), level=NOISY)
1363                 if not IFileNode.providedBy(root):
1364                     raise SFTPError(FX_PERMISSION_DENIED,
1365                                     "cannot open a directory cap")
1366                 if (flags & FXF_WRITE) and root.is_readonly():
1367                     raise SFTPError(FX_PERMISSION_DENIED,
1368                                     "cannot write to a non-writeable filecap without a parent directory")
1369                 if flags & FXF_EXCL:
1370                     raise SFTPError(FX_FAILURE,
1371                                     "cannot create a file exclusively when it already exists")
1372
1373                 # The file does not need to be added to all_heisenfiles, because it is not
1374                 # associated with a directory entry that needs to be updated.
1375
1376                 metadata = update_metadata(None, desired_metadata, time())
1377
1378                 # We have to decide what to pass for the 'parent_readonly' argument to _no_write,
1379                 # given that we don't actually have a parent. This only affects the permissions
1380                 # reported by a getAttrs on this file handle in the case of an immutable file.
1381                 # We choose 'parent_readonly=True' since that will cause the permissions to be
1382                 # reported as r--r--r--, which is appropriate because an immutable file can't be
1383                 # written via this path.
1384
1385                 metadata['no-write'] = _no_write(True, root)
1386                 return self._make_file(file, userpath, flags, filenode=root, metadata=metadata)
1387             else:
1388                 # case 2
1389                 childname = path[-1]
1390
1391                 if noisy: self.log("case 2: root = %r, childname = %r, desired_metadata = %r, path[:-1] = %r" %
1392                                    (root, childname, desired_metadata, path[:-1]), level=NOISY)
1393                 d2 = root.get_child_at_path(path[:-1])
1394                 def _got_parent(parent):
1395                     if noisy: self.log("_got_parent(%r)" % (parent,), level=NOISY)
1396                     if parent.is_unknown():
1397                         raise SFTPError(FX_PERMISSION_DENIED,
1398                                         "cannot open a child of an unknown object. "
1399                                         "Upgrading the gateway to a later Tahoe-LAFS version may help")
1400
1401                     parent_readonly = parent.is_readonly()
1402                     d3 = defer.succeed(None)
1403                     if flags & FXF_EXCL:
1404                         # FXF_EXCL means that the link to the file (not the file itself) must
1405                         # be created atomically wrt updates by this storage client.
1406                         # That is, we need to create the link before returning success to the
1407                         # SFTP open request (and not just on close, as would normally be the
1408                         # case). We make the link initially point to a zero-length LIT file,
1409                         # which is consistent with what might happen on a POSIX filesystem.
1410
1411                         if parent_readonly:
1412                             raise SFTPError(FX_FAILURE,
1413                                             "cannot create a file exclusively when the parent directory is read-only")
1414
1415                         # 'overwrite=False' ensures failure if the link already exists.
1416                         # FIXME: should use a single call to set_uri and return (child, metadata) (#1035)
1417
1418                         zero_length_lit = "URI:LIT:"
1419                         if noisy: self.log("%r.set_uri(%r, None, readcap=%r, overwrite=False)" %
1420                                            (parent, zero_length_lit, childname), level=NOISY)
1421                         d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit,
1422                                                                   metadata=desired_metadata, overwrite=False))
1423                         def _seturi_done(child):
1424                             if noisy: self.log("%r.get_metadata_for(%r)" % (parent, childname), level=NOISY)
1425                             d4 = parent.get_metadata_for(childname)
1426                             d4.addCallback(lambda metadata: (child, metadata))
1427                             return d4
1428                         d3.addCallback(_seturi_done)
1429                     else:
1430                         if noisy: self.log("%r.get_child_and_metadata(%r)" % (parent, childname), level=NOISY)
1431                         d3.addCallback(lambda ign: parent.get_child_and_metadata(childname))
1432
1433                     def _got_child( (filenode, current_metadata) ):
1434                         if noisy: self.log("_got_child( (%r, %r) )" % (filenode, current_metadata), level=NOISY)
1435
1436                         metadata = update_metadata(current_metadata, desired_metadata, time())
1437
1438                         # Ignore the permissions of the desired_metadata in an open call. The permissions
1439                         # can only be set by setAttrs.
1440                         metadata['no-write'] = _no_write(parent_readonly, filenode, current_metadata)
1441
1442                         if filenode.is_unknown():
1443                             raise SFTPError(FX_PERMISSION_DENIED,
1444                                             "cannot open an unknown cap. Upgrading the gateway "
1445                                             "to a later Tahoe-LAFS version may help")
1446                         if not IFileNode.providedBy(filenode):
1447                             raise SFTPError(FX_PERMISSION_DENIED,
1448                                             "cannot open a directory as if it were a file")
1449                         if (flags & FXF_WRITE) and metadata['no-write']:
1450                             raise SFTPError(FX_PERMISSION_DENIED,
1451                                             "cannot open a non-writeable file for writing")
1452
1453                         return self._make_file(file, userpath, flags, parent=parent, childname=childname,
1454                                                filenode=filenode, metadata=metadata)
1455                     def _no_child(f):
1456                         if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
1457                         f.trap(NoSuchChildError)
1458
1459                         if not (flags & FXF_CREAT):
1460                             raise SFTPError(FX_NO_SUCH_FILE,
1461                                             "the file does not exist, and was not opened with the creation (CREAT) flag")
1462                         if parent_readonly:
1463                             raise SFTPError(FX_PERMISSION_DENIED,
1464                                             "cannot create a file when the parent directory is read-only")
1465
1466                         return self._make_file(file, userpath, flags, parent=parent, childname=childname)
1467                     d3.addCallbacks(_got_child, _no_child)
1468                     return d3
1469
1470                 d2.addCallback(_got_parent)
1471                 return d2
1472
1473         d.addCallback(_got_root)
1474         def _remove_on_error(err):
1475             if file:
1476                 self._remove_heisenfile(userpath, None, None, file)
1477             return err
1478         d.addErrback(_remove_on_error)
1479         d.addBoth(_convert_error, request)
1480         return d
1481
1482     def renameFile(self, from_pathstring, to_pathstring, overwrite=False):
1483         request = ".renameFile(%r, %r)" % (from_pathstring, to_pathstring)
1484         self.log(request, level=OPERATIONAL)
1485
1486         from_path = self._path_from_string(from_pathstring)
1487         to_path = self._path_from_string(to_pathstring)
1488         from_userpath = self._path_to_utf8(from_path)
1489         to_userpath = self._path_to_utf8(to_path)
1490
1491         # the target directory must already exist
1492         d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
1493                                         self._get_parent_or_node(to_path)])
1494         def _got( (from_pair, to_pair) ):
1495             if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r, overwrite=%r)" %
1496                                (from_pair, to_pair, from_pathstring, to_pathstring, overwrite), level=NOISY)
1497             (from_parent, from_childname) = from_pair
1498             (to_parent, to_childname) = to_pair
1499
1500             if from_childname is None:
1501                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename a source object specified by URI")
1502             if to_childname is None:
1503                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename to a destination specified by URI")
1504
1505             # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
1506             # "It is an error if there already exists a file with the name specified
1507             #  by newpath."
1508             # OpenSSH's SFTP server returns FX_PERMISSION_DENIED for this error.
1509             #
1510             # For the standard SSH_FXP_RENAME operation, overwrite=False.
1511             # We also support the posix-rename@openssh.com extension, which uses overwrite=True.
1512
1513             d2 = defer.fail(NoSuchChildError())
1514             if not overwrite:
1515                 d2.addCallback(lambda ign: to_parent.get(to_childname))
1516             def _expect_fail(res):
1517                 if not isinstance(res, Failure):
1518                     raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1519
1520                 # It is OK if we fail for errors other than NoSuchChildError, since that probably
1521                 # indicates some problem accessing the destination directory.
1522                 res.trap(NoSuchChildError)
1523             d2.addBoth(_expect_fail)
1524
1525             # If there are heisenfiles to be written at the 'from' direntry, then ensure
1526             # they will now be written at the 'to' direntry instead.
1527             d2.addCallback(lambda ign:
1528                            self._rename_heisenfiles(from_userpath, from_parent, from_childname,
1529                                                     to_userpath, to_parent, to_childname, overwrite=overwrite))
1530
1531             def _move(renamed):
1532                 # FIXME: use move_child_to_path to avoid possible data loss due to #943
1533                 #d3 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
1534
1535                 d3 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
1536                 def _check(err):
1537                     if noisy: self.log("_check(%r) in .renameFile(%r, %r, overwrite=%r)" %
1538                                        (err, from_pathstring, to_pathstring, overwrite), level=NOISY)
1539
1540                     if not isinstance(err, Failure) or (renamed and err.check(NoSuchChildError)):
1541                         return None
1542                     if not overwrite and err.check(ExistingChildError):
1543                         raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1544
1545                     return err
1546                 d3.addBoth(_check)
1547                 return d3
1548             d2.addCallback(_move)
1549             return d2
1550         d.addCallback(_got)
1551         d.addBoth(_convert_error, request)
1552         return d
1553
1554     def makeDirectory(self, pathstring, attrs):
1555         request = ".makeDirectory(%r, %r)" % (pathstring, attrs)
1556         self.log(request, level=OPERATIONAL)
1557
1558         path = self._path_from_string(pathstring)
1559         metadata = _attrs_to_metadata(attrs)
1560         if 'no-write' in metadata:
1561             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "cannot create a directory that is initially read-only")
1562             return defer.execute(_denied)
1563
1564         d = self._get_root(path)
1565         d.addCallback(lambda (root, path):
1566                       self._get_or_create_directories(root, path, metadata))
1567         d.addBoth(_convert_error, request)
1568         return d
1569
1570     def _get_or_create_directories(self, node, path, metadata):
1571         if not IDirectoryNode.providedBy(node):
1572             # TODO: provide the name of the blocking file in the error message.
1573             def _blocked(): raise SFTPError(FX_FAILURE, "cannot create directory because there "
1574                                                         "is a file in the way") # close enough
1575             return defer.execute(_blocked)
1576
1577         if not path:
1578             return defer.succeed(node)
1579         d = node.get(path[0])
1580         def _maybe_create(f):
1581             f.trap(NoSuchChildError)
1582             return node.create_subdirectory(path[0])
1583         d.addErrback(_maybe_create)
1584         d.addCallback(self._get_or_create_directories, path[1:], metadata)
1585         return d
1586
1587     def removeFile(self, pathstring):
1588         request = ".removeFile(%r)" % (pathstring,)
1589         self.log(request, level=OPERATIONAL)
1590
1591         path = self._path_from_string(pathstring)
1592         d = self._remove_object(path, must_be_file=True)
1593         d.addBoth(_convert_error, request)
1594         return d
1595
1596     def removeDirectory(self, pathstring):
1597         request = ".removeDirectory(%r)" % (pathstring,)
1598         self.log(request, level=OPERATIONAL)
1599
1600         path = self._path_from_string(pathstring)
1601         d = self._remove_object(path, must_be_directory=True)
1602         d.addBoth(_convert_error, request)
1603         return d
1604
1605     def _remove_object(self, path, must_be_directory=False, must_be_file=False):
1606         userpath = self._path_to_utf8(path)
1607         d = self._get_parent_or_node(path)
1608         def _got_parent( (parent, childname) ):
1609             if childname is None:
1610                 raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
1611
1612             direntry = _direntry_for(parent, childname)
1613             d2 = defer.succeed(False)
1614             if not must_be_directory:
1615                 d2.addCallback(lambda ign: self._abandon_any_heisenfiles(userpath, direntry))
1616
1617             d2.addCallback(lambda abandoned:
1618                            parent.delete(childname, must_exist=not abandoned,
1619                                          must_be_directory=must_be_directory, must_be_file=must_be_file))
1620             return d2
1621         d.addCallback(_got_parent)
1622         return d
1623
1624     def openDirectory(self, pathstring):
1625         request = ".openDirectory(%r)" % (pathstring,)
1626         self.log(request, level=OPERATIONAL)
1627
1628         path = self._path_from_string(pathstring)
1629         d = self._get_parent_or_node(path)
1630         def _got_parent_or_node( (parent_or_node, childname) ):
1631             if noisy: self.log("_got_parent_or_node( (%r, %r) ) in openDirectory(%r)" %
1632                                (parent_or_node, childname, pathstring), level=NOISY)
1633             if childname is None:
1634                 return parent_or_node
1635             else:
1636                 return parent_or_node.get(childname)
1637         d.addCallback(_got_parent_or_node)
1638         def _list(dirnode):
1639             if dirnode.is_unknown():
1640                 raise SFTPError(FX_PERMISSION_DENIED,
1641                                 "cannot list an unknown cap as a directory. Upgrading the gateway "
1642                                 "to a later Tahoe-LAFS version may help")
1643             if not IDirectoryNode.providedBy(dirnode):
1644                 raise SFTPError(FX_PERMISSION_DENIED,
1645                                 "cannot list a file as if it were a directory")
1646
1647             d2 = dirnode.list()
1648             def _render(children):
1649                 parent_readonly = dirnode.is_readonly()
1650                 results = []
1651                 for filename, (child, metadata) in children.iteritems():
1652                     # The file size may be cached or absent.
1653                     metadata['no-write'] = _no_write(parent_readonly, child, metadata)
1654                     attrs = _populate_attrs(child, metadata)
1655                     filename_utf8 = filename.encode('utf-8')
1656                     longname = _lsLine(filename_utf8, attrs)
1657                     results.append( (filename_utf8, longname, attrs) )
1658                 return StoppableList(results)
1659             d2.addCallback(_render)
1660             return d2
1661         d.addCallback(_list)
1662         d.addBoth(_convert_error, request)
1663         return d
1664
1665     def getAttrs(self, pathstring, followLinks):
1666         request = ".getAttrs(%r, followLinks=%r)" % (pathstring, followLinks)
1667         self.log(request, level=OPERATIONAL)
1668
1669         # When asked about a specific file, report its current size.
1670         # TODO: the modification time for a mutable file should be
1671         # reported as the update time of the best version. But that
1672         # information isn't currently stored in mutable shares, I think.
1673
1674         path = self._path_from_string(pathstring)
1675         userpath = self._path_to_utf8(path)
1676         d = self._get_parent_or_node(path)
1677         def _got_parent_or_node( (parent_or_node, childname) ):
1678             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1679
1680             # Some clients will incorrectly try to get the attributes
1681             # of a file immediately after opening it, before it has been put
1682             # into the all_heisenfiles table. This is a race condition bug in
1683             # the client, but we handle it anyway by calling .sync() on all
1684             # files matching either the path or the direntry.
1685
1686             direntry = _direntry_for(parent_or_node, childname)
1687             d2 = self._sync_heisenfiles(userpath, direntry)
1688
1689             if childname is None:
1690                 node = parent_or_node
1691                 d2.addCallback(lambda ign: node.get_current_size())
1692                 d2.addCallback(lambda size:
1693                                _populate_attrs(node, {'no-write': node.is_unknown() or node.is_readonly()}, size=size))
1694             else:
1695                 parent = parent_or_node
1696                 d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname]))
1697                 def _got( (child, metadata) ):
1698                     if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
1699                     assert IDirectoryNode.providedBy(parent), parent
1700                     metadata['no-write'] = _no_write(parent.is_readonly(), child, metadata)
1701                     d3 = child.get_current_size()
1702                     d3.addCallback(lambda size: _populate_attrs(child, metadata, size=size))
1703                     return d3
1704                 def _nosuch(err):
1705                     if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
1706                     err.trap(NoSuchChildError)
1707                     if noisy: self.log("checking open files:\nself._heisenfiles = %r\nall_heisenfiles = %r\ndirentry=%r" %
1708                                        (self._heisenfiles, all_heisenfiles, direntry), level=NOISY)
1709                     if direntry in all_heisenfiles:
1710                         files = all_heisenfiles[direntry]
1711                         if len(files) == 0:  # pragma: no cover
1712                             return err
1713                         # use the heisenfile that was most recently opened
1714                         return files[-1].getAttrs()
1715                     return err
1716                 d2.addCallbacks(_got, _nosuch)
1717             return d2
1718         d.addCallback(_got_parent_or_node)
1719         d.addBoth(_convert_error, request)
1720         return d
1721
1722     def setAttrs(self, pathstring, attrs):
1723         request = ".setAttrs(%r, %r)" % (pathstring, attrs)
1724         self.log(request, level=OPERATIONAL)
1725
1726         if "size" in attrs:
1727             # this would require us to download and re-upload the truncated/extended
1728             # file contents
1729             def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute unsupported")
1730             return defer.execute(_unsupported)
1731
1732         path = self._path_from_string(pathstring)
1733         userpath = self._path_to_utf8(path)
1734         d = self._get_parent_or_node(path)
1735         def _got_parent_or_node( (parent_or_node, childname) ):
1736             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1737
1738             direntry = _direntry_for(parent_or_node, childname)
1739             d2 = self._update_attrs_for_heisenfiles(userpath, direntry, attrs)
1740
1741             def _update(updated_heisenfiles):
1742                 if childname is None:
1743                     if updated_heisenfiles:
1744                         return None
1745                     raise SFTPError(FX_NO_SUCH_FILE, userpath)
1746                 else:
1747                     desired_metadata = _attrs_to_metadata(attrs)
1748                     if noisy: self.log("desired_metadata = %r" % (desired_metadata,), level=NOISY)
1749
1750                     d3 = parent_or_node.set_metadata_for(childname, desired_metadata)
1751                     def _nosuch(err):
1752                         if updated_heisenfiles:
1753                             err.trap(NoSuchChildError)
1754                         else:
1755                             return err
1756                     d3.addErrback(_nosuch)
1757                     return d3
1758             d2.addCallback(_update)
1759             d2.addCallback(lambda ign: None)
1760             return d2
1761         d.addCallback(_got_parent_or_node)
1762         d.addBoth(_convert_error, request)
1763         return d
1764
1765     def readLink(self, pathstring):
1766         self.log(".readLink(%r)" % (pathstring,), level=OPERATIONAL)
1767
1768         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "readLink")
1769         return defer.execute(_unsupported)
1770
1771     def makeLink(self, linkPathstring, targetPathstring):
1772         self.log(".makeLink(%r, %r)" % (linkPathstring, targetPathstring), level=OPERATIONAL)
1773
1774         # If this is implemented, note the reversal of arguments described in point 7 of
1775         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>.
1776
1777         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "makeLink")
1778         return defer.execute(_unsupported)
1779
1780     def extendedRequest(self, extensionName, extensionData):
1781         self.log(".extendedRequest(%r, <data of length %r>)" % (extensionName, len(extensionData)), level=OPERATIONAL)
1782
1783         # We implement the three main OpenSSH SFTP extensions; see
1784         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1785
1786         if extensionName == 'posix-rename@openssh.com':
1787             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "could not parse posix-rename@openssh.com request")
1788
1789             if 4 > len(extensionData): return defer.execute(_bad)
1790             (fromPathLen,) = struct.unpack('>L', extensionData[0:4])
1791             if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
1792
1793             (toPathLen,) = struct.unpack('>L', extensionData[(4 + fromPathLen):(8 + fromPathLen)])
1794             if 8 + fromPathLen + toPathLen != len(extensionData): return defer.execute(_bad)
1795
1796             fromPathstring = extensionData[4:(4 + fromPathLen)]
1797             toPathstring = extensionData[(8 + fromPathLen):]
1798             d = self.renameFile(fromPathstring, toPathstring, overwrite=True)
1799
1800             # Twisted conch assumes that the response from an extended request is either
1801             # an error, or an FXP_EXTENDED_REPLY. But it happens to do the right thing
1802             # (respond with an FXP_STATUS message) if we return a Failure with code FX_OK.
1803             def _succeeded(ign):
1804                 raise SFTPError(FX_OK, "request succeeded")
1805             d.addCallback(_succeeded)
1806             return d
1807
1808         if extensionName == 'statvfs@openssh.com' or extensionName == 'fstatvfs@openssh.com':
1809             # f_bsize and f_frsize should be the same to avoid a bug in 'df'
1810             return defer.succeed(struct.pack('>11Q',
1811                 1024,         # uint64  f_bsize     /* file system block size */
1812                 1024,         # uint64  f_frsize    /* fundamental fs block size */
1813                 628318530,    # uint64  f_blocks    /* number of blocks (unit f_frsize) */
1814                 314159265,    # uint64  f_bfree     /* free blocks in file system */
1815                 314159265,    # uint64  f_bavail    /* free blocks for non-root */
1816                 200000000,    # uint64  f_files     /* total file inodes */
1817                 100000000,    # uint64  f_ffree     /* free file inodes */
1818                 100000000,    # uint64  f_favail    /* free file inodes for non-root */
1819                 0x1AF5,       # uint64  f_fsid      /* file system id */
1820                 2,            # uint64  f_flag      /* bit mask = ST_NOSUID; not ST_RDONLY */
1821                 65535,        # uint64  f_namemax   /* maximum filename length */
1822                 ))
1823
1824         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "unsupported %r request <data of length %r>" %
1825                                                                (extensionName, len(extensionData)))
1826         return defer.execute(_unsupported)
1827
1828     def realPath(self, pathstring):
1829         self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
1830
1831         return self._path_to_utf8(self._path_from_string(pathstring))
1832
1833     def _path_to_utf8(self, path):
1834         return (u"/" + u"/".join(path)).encode('utf-8')
1835
1836     def _path_from_string(self, pathstring):
1837         if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
1838
1839         assert isinstance(pathstring, str), pathstring
1840
1841         # The home directory is the root directory.
1842         pathstring = pathstring.strip("/")
1843         if pathstring == "" or pathstring == ".":
1844             path_utf8 = []
1845         else:
1846             path_utf8 = pathstring.split("/")
1847
1848         # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.2>
1849         # "Servers SHOULD interpret a path name component ".." as referring to
1850         #  the parent directory, and "." as referring to the current directory."
1851         path = []
1852         for p_utf8 in path_utf8:
1853             if p_utf8 == "..":
1854                 # ignore excess .. components at the root
1855                 if len(path) > 0:
1856                     path = path[:-1]
1857             elif p_utf8 != ".":
1858                 try:
1859                     p = p_utf8.decode('utf-8', 'strict')
1860                 except UnicodeError:
1861                     raise SFTPError(FX_NO_SUCH_FILE, "path could not be decoded as UTF-8")
1862                 path.append(p)
1863
1864         if noisy: self.log(" PATH %r" % (path,), level=NOISY)
1865         return path
1866
1867     def _get_root(self, path):
1868         # return Deferred (root, remaining_path)
1869         d = defer.succeed(None)
1870         if path and path[0] == u"uri":
1871             d.addCallback(lambda ign: self._client.create_node_from_uri(path[1].encode('utf-8')))
1872             d.addCallback(lambda root: (root, path[2:]))
1873         else:
1874             d.addCallback(lambda ign: (self._root, path))
1875         return d
1876
1877     def _get_parent_or_node(self, path):
1878         # return Deferred (parent, childname) or (node, None)
1879         d = self._get_root(path)
1880         def _got_root( (root, remaining_path) ):
1881             if not remaining_path:
1882                 return (root, None)
1883             else:
1884                 d2 = root.get_child_at_path(remaining_path[:-1])
1885                 d2.addCallback(lambda parent: (parent, remaining_path[-1]))
1886                 return d2
1887         d.addCallback(_got_root)
1888         return d
1889
1890
1891 class FakeTransport:
1892     implements(ITransport)
1893     def write(self, data):
1894         logmsg("FakeTransport.write(<data of length %r>)" % (len(data),), level=NOISY)
1895
1896     def writeSequence(self, data):
1897         logmsg("FakeTransport.writeSequence(...)", level=NOISY)
1898
1899     def loseConnection(self):
1900         logmsg("FakeTransport.loseConnection()", level=NOISY)
1901
1902     # getPeer and getHost can just raise errors, since we don't know what to return
1903
1904
1905 class ShellSession(PrefixingLogMixin):
1906     implements(ISession)
1907     def __init__(self, userHandler):
1908         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
1909         if noisy: self.log(".__init__(%r)" % (userHandler), level=NOISY)
1910
1911     def getPty(self, terminal, windowSize, attrs):
1912         self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
1913
1914     def openShell(self, protocol):
1915         self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
1916         if hasattr(protocol, 'transport') and protocol.transport is None:
1917             protocol.transport = FakeTransport()  # work around Twisted bug
1918
1919         d = defer.succeed(None)
1920         d.addCallback(lambda ign: protocol.write("This server supports only SFTP, not shell sessions.\n"))
1921         d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
1922         return d
1923
1924     def execCommand(self, protocol, cmd):
1925         self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
1926         if hasattr(protocol, 'transport') and protocol.transport is None:
1927             protocol.transport = FakeTransport()  # work around Twisted bug
1928
1929         d = defer.succeed(None)
1930         if cmd == "df -P -k /":
1931             d.addCallback(lambda ign: protocol.write(
1932                           "Filesystem         1024-blocks      Used Available Capacity Mounted on\n"
1933                           "tahoe                628318530 314159265 314159265      50% /\n"))
1934             d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessDone(None))))
1935         else:
1936             d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
1937         return d
1938
1939     def windowChanged(self, newWindowSize):
1940         self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
1941
1942     def eofReceived(self):
1943         self.log(".eofReceived()", level=OPERATIONAL)
1944
1945     def closed(self):
1946         self.log(".closed()", level=OPERATIONAL)
1947
1948
1949 # If you have an SFTPUserHandler and want something that provides ISession, you get
1950 # ShellSession(userHandler).
1951 # We use adaptation because this must be a different object to the SFTPUserHandler.
1952 components.registerAdapter(ShellSession, SFTPUserHandler, ISession)
1953
1954
1955 from allmydata.frontends.auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
1956
1957 class Dispatcher:
1958     implements(portal.IRealm)
1959     def __init__(self, client):
1960         self._client = client
1961
1962     def requestAvatar(self, avatarID, mind, interface):
1963         assert interface == IConchUser, interface
1964         rootnode = self._client.create_node_from_uri(avatarID.rootcap)
1965         handler = SFTPUserHandler(self._client, rootnode, avatarID.username)
1966         return (interface, handler, handler.logout)
1967
1968
1969 class SFTPServer(service.MultiService):
1970     def __init__(self, client, accountfile, accounturl,
1971                  sftp_portstr, pubkey_file, privkey_file):
1972         service.MultiService.__init__(self)
1973
1974         r = Dispatcher(client)
1975         p = portal.Portal(r)
1976
1977         if accountfile:
1978             c = AccountFileChecker(self, accountfile)
1979             p.registerChecker(c)
1980         if accounturl:
1981             c = AccountURLChecker(self, accounturl)
1982             p.registerChecker(c)
1983         if not accountfile and not accounturl:
1984             # we could leave this anonymous, with just the /uri/CAP form
1985             raise NeedRootcapLookupScheme("must provide an account file or URL")
1986
1987         pubkey = keys.Key.fromFile(pubkey_file)
1988         privkey = keys.Key.fromFile(privkey_file)
1989         class SSHFactory(factory.SSHFactory):
1990             publicKeys = {pubkey.sshType(): pubkey}
1991             privateKeys = {privkey.sshType(): privkey}
1992             def getPrimes(self):
1993                 try:
1994                     # if present, this enables diffie-hellman-group-exchange
1995                     return primes.parseModuliFile("/etc/ssh/moduli")
1996                 except IOError:
1997                     return None
1998
1999         f = SSHFactory()
2000         f.portal = p
2001
2002         s = strports.service(sftp_portstr, f)
2003         s.setServiceParent(self)