]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/sftpd.py
SFTP: fix pyflakes warnings; drop 'noisy' versions of eventually_callback and eventua...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / sftpd.py
1
2 import os, tempfile, heapq, binascii, traceback, array, stat, struct
3 from stat import S_IFREG, S_IFDIR
4 from time import time, strftime, localtime
5
6 from zope.interface import implements
7 from twisted.python import components
8 from twisted.application import service, strports
9 from twisted.conch.ssh import factory, keys, session
10 from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
11      FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
12      FX_BAD_MESSAGE, FX_FAILURE
13 from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
14      FXF_CREAT, FXF_TRUNC, FXF_EXCL
15 from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser, ISession
16 from twisted.conch.avatar import ConchUser
17 from twisted.conch.openssh_compat import primes
18 from twisted.cred import portal
19 from twisted.internet.error import ProcessDone, ProcessTerminated
20 from twisted.python.failure import Failure
21 from twisted.internet.interfaces import ITransport
22
23 from twisted.internet import defer
24 from twisted.internet.interfaces import IFinishableConsumer
25 from foolscap.api import eventually
26 from allmydata.util import deferredutil
27
28 from allmydata.util.consumer import download_to_data
29 from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
30      NoSuchChildError
31 from allmydata.mutable.common import NotWriteableError
32 from allmydata.immutable.upload import FileHandle
33
34 from pycryptopp.cipher.aes import AES
35
36 # twisted.conch.ssh.filetransfer generates this warning, but not when it is imported,
37 # only on an error.
38 import warnings
39 warnings.filterwarnings("ignore", category=DeprecationWarning,
40     message="BaseException.message has been deprecated as of Python 2.6",
41     module=".*filetransfer", append=True)
42
43 noisy = True
44 use_foolscap_logging = True
45
46 from allmydata.util.log import NOISY, OPERATIONAL, \
47     msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
48
49 if use_foolscap_logging:
50     (logmsg, logerr, PrefixingLogMixin) = (_msg, _err, _PrefixingLogMixin)
51 else:  # pragma: no cover
52     def logmsg(s, level=None):
53         print s
54     def logerr(s, level=None):
55         print s
56     class PrefixingLogMixin:
57         def __init__(self, facility=None):
58             pass
59         def log(self, s, level=None):
60             print s
61
62
63 def eventually_callback(d):
64     return lambda res: eventually(d.callback, res)
65
66 def eventually_errback(d):
67     return lambda err: eventually(d.errback, err)
68
69
70 def _utf8(x):
71     if isinstance(x, unicode):
72         return x.encode('utf-8')
73     if isinstance(x, str):
74         return x
75     return repr(x)
76
77
78 def _convert_error(res, request):
79     if not isinstance(res, Failure):
80         logged_res = res
81         if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
82         logmsg("SUCCESS %r %r" % (request, logged_res,), level=OPERATIONAL)
83         return res
84
85     err = res
86     logmsg("RAISE %r %r" % (request, err,), level=OPERATIONAL)
87     if noisy and not use_foolscap_logging: traceback.print_exc(err)
88
89     # The message argument to SFTPError must not reveal information that
90     # might compromise anonymity.
91
92     if err.check(SFTPError):
93         # original raiser of SFTPError has responsibility to ensure anonymity
94         raise err
95     if err.check(NoSuchChildError):
96         childname = _utf8(err.value.args[0])
97         raise SFTPError(FX_NO_SUCH_FILE, childname)
98     if err.check(NotWriteableError):
99         msg = _utf8(err.value.args[0])
100         raise SFTPError(FX_PERMISSION_DENIED, msg)
101     if err.check(ExistingChildError):
102         # Versions of SFTP after v3 (which is what twisted.conch implements)
103         # define a specific error code for this case: FX_FILE_ALREADY_EXISTS.
104         # However v3 doesn't; instead, other servers such as sshd return
105         # FX_FAILURE. The gvfs SFTP backend, for example, depends on this
106         # to translate the error to the equivalent of POSIX EEXIST, which is
107         # necessary for some picky programs (such as gedit).
108         msg = _utf8(err.value.args[0])
109         raise SFTPError(FX_FAILURE, msg)
110     if err.check(NotImplementedError):
111         raise SFTPError(FX_OP_UNSUPPORTED, _utf8(err.value))
112     if err.check(EOFError):
113         raise SFTPError(FX_EOF, "end of file reached")
114     if err.check(defer.FirstError):
115         _convert_error(err.value.subFailure, request)
116
117     # We assume that the error message is not anonymity-sensitive.
118     raise SFTPError(FX_FAILURE, _utf8(err.value))
119
120
121 def _repr_flags(flags):
122     return "|".join([f for f in
123                      [(flags & FXF_READ)   and "FXF_READ"   or None,
124                       (flags & FXF_WRITE)  and "FXF_WRITE"  or None,
125                       (flags & FXF_APPEND) and "FXF_APPEND" or None,
126                       (flags & FXF_CREAT)  and "FXF_CREAT"  or None,
127                       (flags & FXF_TRUNC)  and "FXF_TRUNC"  or None,
128                       (flags & FXF_EXCL)   and "FXF_EXCL"   or None,
129                      ]
130                      if f])
131
132
133 def _lsLine(name, attrs):
134     st_uid = "tahoe"
135     st_gid = "tahoe"
136     st_mtime = attrs.get("mtime", 0)
137     st_mode = attrs["permissions"]
138     # TODO: check that clients are okay with this being a "?".
139     # (They should be because the longname is intended for human
140     # consumption.)
141     st_size = attrs.get("size", "?")
142     # We don't know how many links there really are to this object.
143     st_nlink = 1
144
145     # Based on <http://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
146     # We can't call the version in Twisted because we might have a version earlier than
147     # <http://twistedmatrix.com/trac/changeset/25412> (released in Twisted 8.2).
148
149     mode = st_mode
150     perms = array.array('c', '-'*10)
151     ft = stat.S_IFMT(mode)
152     if   stat.S_ISDIR(ft):  perms[0] = 'd'
153     elif stat.S_ISCHR(ft):  perms[0] = 'c'
154     elif stat.S_ISBLK(ft):  perms[0] = 'b'
155     elif stat.S_ISREG(ft):  perms[0] = '-'
156     elif stat.S_ISFIFO(ft): perms[0] = 'f'
157     elif stat.S_ISLNK(ft):  perms[0] = 'l'
158     elif stat.S_ISSOCK(ft): perms[0] = 's'
159     else: perms[0] = '?'
160     # user
161     if mode&stat.S_IRUSR: perms[1] = 'r'
162     if mode&stat.S_IWUSR: perms[2] = 'w'
163     if mode&stat.S_IXUSR: perms[3] = 'x'
164     # group
165     if mode&stat.S_IRGRP: perms[4] = 'r'
166     if mode&stat.S_IWGRP: perms[5] = 'w'
167     if mode&stat.S_IXGRP: perms[6] = 'x'
168     # other
169     if mode&stat.S_IROTH: perms[7] = 'r'
170     if mode&stat.S_IWOTH: perms[8] = 'w'
171     if mode&stat.S_IXOTH: perms[9] = 'x'
172     # suid/sgid never set
173
174     l = perms.tostring()
175     l += str(st_nlink).rjust(5) + ' '
176     un = str(st_uid)
177     l += un.ljust(9)
178     gr = str(st_gid)
179     l += gr.ljust(9)
180     sz = str(st_size)
181     l += sz.rjust(8)
182     l += ' '
183     sixmo = 60 * 60 * 24 * 7 * 26
184     if st_mtime + sixmo < time(): # last edited more than 6mo ago
185         l += strftime("%b %d  %Y ", localtime(st_mtime))
186     else:
187         l += strftime("%b %d %H:%M ", localtime(st_mtime))
188     l += name
189     return l
190
191
192 def _is_readonly(parent_readonly, child):
193     """Whether child should be listed as having read-only permissions in parent."""
194
195     if child.is_unknown():
196         return True
197     elif child.is_mutable():
198         return child.is_readonly()
199     else:
200         return parent_readonly
201
202
203 def _populate_attrs(childnode, metadata, size=None):
204     attrs = {}
205
206     # The permissions must have the S_IFDIR (040000) or S_IFREG (0100000)
207     # bits, otherwise the client may refuse to open a directory.
208     # Also, sshfs run as a non-root user requires files and directories
209     # to be world-readable/writeable.
210     #
211     # Directories and unknown nodes have no size, and SFTP doesn't
212     # require us to make one up.
213     #
214     # childnode might be None, meaning that the file doesn't exist yet,
215     # but we're going to write it later.
216
217     if childnode and childnode.is_unknown():
218         perms = 0
219     elif childnode and IDirectoryNode.providedBy(childnode):
220         perms = S_IFDIR | 0777
221     else:
222         # For files, omit the size if we don't immediately know it.
223         if childnode and size is None:
224             size = childnode.get_size()
225         if size is not None:
226             assert isinstance(size, (int, long)) and not isinstance(size, bool), repr(size)
227             attrs['size'] = size
228         perms = S_IFREG | 0666
229
230     if metadata:
231         assert 'readonly' in metadata, metadata
232         if metadata['readonly']:
233             perms &= S_IFDIR | S_IFREG | 0555  # clear 'w' bits
234
235         # see webapi.txt for what these times mean
236         if 'linkmotime' in metadata.get('tahoe', {}):
237             attrs['mtime'] = int(metadata['tahoe']['linkmotime'])
238         elif 'mtime' in metadata:
239             # We would prefer to omit atime, but SFTP version 3 can only
240             # accept mtime if atime is also set.
241             attrs['mtime'] = int(metadata['mtime'])
242             attrs['atime'] = attrs['mtime']
243
244         if 'linkcrtime' in metadata.get('tahoe', {}):
245             attrs['createtime'] = int(metadata['tahoe']['linkcrtime'])
246
247         if 'ctime' in metadata:
248             attrs['ctime'] = int(metadata['ctime'])
249
250     attrs['permissions'] = perms
251
252     # twisted.conch.ssh.filetransfer only implements SFTP version 3,
253     # which doesn't include SSH_FILEXFER_ATTR_FLAGS.
254
255     return attrs
256
257
258 class EncryptedTemporaryFile(PrefixingLogMixin):
259     # not implemented: next, readline, readlines, xreadlines, writelines
260
261     def __init__(self):
262         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
263         self.file = tempfile.TemporaryFile()
264         self.key = os.urandom(16)  # AES-128
265
266     def _crypt(self, offset, data):
267         # FIXME: use random-access AES (pycryptopp ticket #18)
268         offset_big = offset // 16
269         offset_small = offset % 16
270         iv = binascii.unhexlify("%032x" % offset_big)
271         cipher = AES(self.key, iv=iv)
272         cipher.process("\x00"*offset_small)
273         return cipher.process(data)
274
275     def close(self):
276         self.file.close()
277
278     def flush(self):
279         self.file.flush()
280
281     def seek(self, offset, whence=os.SEEK_SET):
282         if noisy: self.log(".seek(%r, %r)" % (offset, whence), level=NOISY)
283         self.file.seek(offset, whence)
284
285     def tell(self):
286         offset = self.file.tell()
287         if noisy: self.log(".tell() = %r" % (offset,), level=NOISY)
288         return offset
289
290     def read(self, size=-1):
291         if noisy: self.log(".read(%r)" % (size,), level=NOISY)
292         index = self.file.tell()
293         ciphertext = self.file.read(size)
294         plaintext = self._crypt(index, ciphertext)
295         return plaintext
296
297     def write(self, plaintext):
298         if noisy: self.log(".write(<data of length %r>)" % (len(plaintext),), level=NOISY)
299         index = self.file.tell()
300         ciphertext = self._crypt(index, plaintext)
301         self.file.write(ciphertext)
302
303     def truncate(self, newsize):
304         if noisy: self.log(".truncate(%r)" % (newsize,), level=NOISY)
305         self.file.truncate(newsize)
306
307
308 class OverwriteableFileConsumer(PrefixingLogMixin):
309     implements(IFinishableConsumer)
310     """I act both as a consumer for the download of the original file contents, and as a
311     wrapper for a temporary file that records the downloaded data and any overwrites.
312     I use a priority queue to keep track of which regions of the file have been overwritten
313     but not yet downloaded, so that the download does not clobber overwritten data.
314     I use another priority queue to record milestones at which to make callbacks
315     indicating that a given number of bytes have been downloaded.
316
317     The temporary file reflects the contents of the file that I represent, except that:
318      - regions that have neither been downloaded nor overwritten, if present,
319        contain zeroes.
320      - the temporary file may be shorter than the represented file (it is never longer).
321        The latter's current size is stored in self.current_size.
322
323     This abstraction is mostly independent of SFTP. Consider moving it, if it is found
324     useful for other frontends."""
325
326     def __init__(self, check_abort, download_size, tempfile_maker):
327         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
328         if noisy: self.log(".__init__(%r, %r, %r)" % (check_abort, download_size, tempfile_maker), level=NOISY)
329         self.check_abort = check_abort
330         self.download_size = download_size
331         self.current_size = download_size
332         self.f = tempfile_maker()
333         self.downloaded = 0
334         self.milestones = []  # empty heap of (offset, d)
335         self.overwrites = []  # empty heap of (start, end)
336         self.done = self.when_reached(download_size)  # adds a milestone
337         self.is_done = False
338         def _signal_done(ign):
339             if noisy: self.log("DONE", level=NOISY)
340             self.is_done = True
341         self.done.addCallback(_signal_done)
342         self.producer = None
343
344     def get_file(self):
345         return self.f
346
347     def get_current_size(self):
348         return self.current_size
349
350     def set_current_size(self, size):
351         if noisy: self.log(".set_current_size(%r), current_size = %r, downloaded = %r" %
352                            (size, self.current_size, self.downloaded), level=NOISY)
353         if size < self.current_size or size < self.downloaded:
354             self.f.truncate(size)
355         self.current_size = size
356         if size < self.download_size:
357             self.download_size = size
358         if self.downloaded >= self.download_size:
359             self.finish()
360
361     def registerProducer(self, p, streaming):
362         if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
363         self.producer = p
364         if streaming:
365             # call resumeProducing once to start things off
366             p.resumeProducing()
367         else:
368             def _iterate():
369                 if not self.is_done:
370                     p.resumeProducing()
371                     eventually(_iterate)
372             _iterate()
373
374     def write(self, data):
375         if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
376         if self.check_abort():
377             self.close()
378             return
379
380         if self.downloaded >= self.download_size:
381             return
382
383         next_downloaded = self.downloaded + len(data)
384         if next_downloaded > self.download_size:
385             data = data[:(self.download_size - self.downloaded)]
386
387         while len(self.overwrites) > 0:
388             (start, end) = self.overwrites[0]
389             if start >= next_downloaded:
390                 # This and all remaining overwrites are after the data we just downloaded.
391                 break
392             if start > self.downloaded:
393                 # The data we just downloaded has been partially overwritten.
394                 # Write the prefix of it that precedes the overwritten region.
395                 self.f.seek(self.downloaded)
396                 self.f.write(data[:(start - self.downloaded)])
397
398             # This merges consecutive overwrites if possible, which allows us to detect the
399             # case where the download can be stopped early because the remaining region
400             # to download has already been fully overwritten.
401             heapq.heappop(self.overwrites)
402             while len(self.overwrites) > 0:
403                 (start1, end1) = self.overwrites[0]
404                 if start1 > end:
405                     break
406                 end = end1
407                 heapq.heappop(self.overwrites)
408
409             if end >= next_downloaded:
410                 # This overwrite extends past the downloaded data, so there is no
411                 # more data to consider on this call.
412                 heapq.heappush(self.overwrites, (next_downloaded, end))
413                 self._update_downloaded(next_downloaded)
414                 return
415             elif end >= self.downloaded:
416                 data = data[(end - self.downloaded):]
417                 self._update_downloaded(end)
418
419         self.f.seek(self.downloaded)
420         self.f.write(data)
421         self._update_downloaded(next_downloaded)
422
423     def _update_downloaded(self, new_downloaded):
424         self.downloaded = new_downloaded
425         milestone = new_downloaded
426         if len(self.overwrites) > 0:
427             (start, end) = self.overwrites[0]
428             if start <= new_downloaded and end > milestone:
429                 milestone = end
430
431         while len(self.milestones) > 0:
432             (next, d) = self.milestones[0]
433             if next > milestone:
434                 return
435             if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
436             heapq.heappop(self.milestones)
437             eventually_callback(d)(None)
438
439         if milestone >= self.download_size:
440             self.finish()
441
442     def overwrite(self, offset, data):
443         if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
444         if offset > self.download_size and offset > self.current_size:
445             # Normally writing at an offset beyond the current end-of-file
446             # would leave a hole that appears filled with zeroes. However, an
447             # EncryptedTemporaryFile doesn't behave like that (if there is a
448             # hole in the file on disk, the zeroes that are read back will be
449             # XORed with the keystream). So we must explicitly write zeroes in
450             # the gap between the current EOF and the offset.
451
452             self.f.seek(self.current_size)
453             self.f.write("\x00" * (offset - self.current_size))            
454         else:
455             self.f.seek(offset)
456         self.f.write(data)
457         end = offset + len(data)
458         self.current_size = max(self.current_size, end)
459         if end > self.downloaded:
460             heapq.heappush(self.overwrites, (offset, end))
461
462     def read(self, offset, length):
463         """When the data has been read, callback the Deferred that we return with this data.
464         Otherwise errback the Deferred that we return.
465         The caller must perform no more overwrites until the Deferred has fired."""
466
467         if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
468         if offset >= self.current_size:
469             def _eof(): raise EOFError("read past end of file")
470             return defer.execute(_eof)
471
472         if offset + length > self.current_size:
473             length = self.current_size - offset
474             if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
475
476         needed = min(offset + length, self.download_size)
477         d = self.when_reached(needed)
478         def _reached(ign):
479             # It is not necessarily the case that self.downloaded >= needed, because
480             # the file might have been truncated (thus truncating the download) and
481             # then extended.
482
483             assert self.current_size >= offset + length, (self.current_size, offset, length)
484             if noisy: self.log("self.f = %r" % (self.f,), level=NOISY)
485             self.f.seek(offset)
486             return self.f.read(length)
487         d.addCallback(_reached)
488         return d
489
490     def when_reached(self, index):
491         if noisy: self.log(".when_reached(%r)" % (index,), level=NOISY)
492         if index <= self.downloaded:  # already reached
493             if noisy: self.log("already reached %r" % (index,), level=NOISY)
494             return defer.succeed(None)
495         d = defer.Deferred()
496         def _reached(ign):
497             if noisy: self.log("reached %r" % (index,), level=NOISY)
498             return ign
499         d.addCallback(_reached)
500         heapq.heappush(self.milestones, (index, d))
501         return d
502
503     def when_done(self):
504         return self.done
505
506     def finish(self):
507         while len(self.milestones) > 0:
508             (next, d) = self.milestones[0]
509             if noisy: self.log("MILESTONE FINISH %r %r" % (next, d), level=NOISY)
510             heapq.heappop(self.milestones)
511             # The callback means that the milestone has been reached if
512             # it is ever going to be. Note that the file may have been
513             # truncated to before the milestone.
514             eventually_callback(d)(None)
515
516         # FIXME: causes spurious failures
517         #self.unregisterProducer()
518
519     def close(self):
520         self.finish()
521         self.f.close()
522
523     def unregisterProducer(self):
524         if self.producer:
525             self.producer.stopProducing()
526             self.producer = None
527
528
529 SIZE_THRESHOLD = 1000
530
531
532 class ShortReadOnlySFTPFile(PrefixingLogMixin):
533     implements(ISFTPFile)
534     """I represent a file handle to a particular file on an SFTP connection.
535     I am used only for short immutable files opened in read-only mode.
536     The file contents are downloaded to memory when I am created."""
537
538     def __init__(self, filenode, metadata):
539         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
540         if noisy: self.log(".__init__(%r, %r)" % (filenode, metadata), level=NOISY)
541
542         assert IFileNode.providedBy(filenode), filenode
543         self.filenode = filenode
544         self.metadata = metadata
545         self.async = download_to_data(filenode)
546         self.closed = False
547
548     def readChunk(self, offset, length):
549         request = ".readChunk(%r, %r)" % (offset, length)
550         self.log(request, level=OPERATIONAL)
551
552         if self.closed:
553             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
554             return defer.execute(_closed)
555
556         d = defer.Deferred()
557         def _read(data):
558             if noisy: self.log("_read(%r) in readChunk(%r, %r)" % (data, offset, length), level=NOISY)
559
560             # "In response to this request, the server will read as many bytes as it
561             #  can from the file (up to 'len'), and return them in a SSH_FXP_DATA
562             #  message.  If an error occurs or EOF is encountered before reading any
563             #  data, the server will respond with SSH_FXP_STATUS.  For normal disk
564             #  files, it is guaranteed that this will read the specified number of
565             #  bytes, or up to end of file."
566             #
567             # i.e. we respond with an EOF error iff offset is already at EOF.
568
569             if offset >= len(data):
570                 eventually_errback(d)(SFTPError(FX_EOF, "read at or past end of file"))
571             else:
572                 eventually_callback(d)(data[offset:min(offset+length, len(data))])
573             return data
574         self.async.addCallbacks(_read, eventually_errback(d))
575         d.addBoth(_convert_error, request)
576         return d
577
578     def writeChunk(self, offset, data):
579         self.log(".writeChunk(%r, <data of length %r>) denied" % (offset, len(data)), level=OPERATIONAL)
580
581         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
582         return defer.execute(_denied)
583
584     def close(self):
585         self.log(".close()", level=OPERATIONAL)
586
587         self.closed = True
588         return defer.succeed(None)
589
590     def getAttrs(self):
591         request = ".getAttrs()"
592         self.log(request, level=OPERATIONAL)
593
594         if self.closed:
595             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
596             return defer.execute(_closed)
597
598         d = defer.execute(_populate_attrs, self.filenode, self.metadata)
599         d.addBoth(_convert_error, request)
600         return d
601
602     def setAttrs(self, attrs):
603         self.log(".setAttrs(%r) denied" % (attrs,), level=OPERATIONAL)
604         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
605         return defer.execute(_denied)
606
607
608 class GeneralSFTPFile(PrefixingLogMixin):
609     implements(ISFTPFile)
610     """I represent a file handle to a particular file on an SFTP connection.
611     I wrap an instance of OverwriteableFileConsumer, which is responsible for
612     storing the file contents. In order to allow write requests to be satisfied
613     immediately, there is effectively a FIFO queue between requests made to this
614     file handle, and requests to my OverwriteableFileConsumer. This queue is
615     implemented by the callback chain of self.async."""
616
617     def __init__(self, close_notify, check_abort, flags, convergence, parent=None, childname=None, filenode=None, metadata=None):
618         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
619         if noisy: self.log(".__init__(%r, %r, %r, <convergence censored>, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
620                            (close_notify, check_abort, flags, parent, childname, filenode, metadata), level=NOISY)
621
622         self.close_notify = close_notify
623         self.check_abort = check_abort
624         self.flags = flags
625         self.convergence = convergence
626         self.parent = parent
627         self.childname = childname
628         self.filenode = filenode
629         self.metadata = metadata
630         self.async = defer.succeed(None)
631         # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
632         self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
633         self.closed = False
634         
635         # self.consumer should only be relied on in callbacks for self.async, since it might
636         # not be set before then.
637         self.consumer = None
638         tempfile_maker = EncryptedTemporaryFile
639
640         if (flags & FXF_TRUNC) or not filenode:
641             # We're either truncating or creating the file, so we don't need the old contents.
642             self.consumer = OverwriteableFileConsumer(self.check_abort, 0, tempfile_maker)
643             self.consumer.finish()
644         else:
645             assert IFileNode.providedBy(filenode), filenode
646
647             # TODO: use download interface described in #993 when implemented.
648             if filenode.is_mutable():
649                 self.async.addCallback(lambda ign: filenode.download_best_version())
650                 def _downloaded(data):
651                     self.consumer = OverwriteableFileConsumer(self.check_abort, len(data), tempfile_maker)
652                     self.consumer.write(data)
653                     self.consumer.finish()
654                     return None
655                 self.async.addCallback(_downloaded)
656             else:
657                 download_size = filenode.get_size()
658                 assert download_size is not None, "download_size is None"
659                 self.consumer = OverwriteableFileConsumer(self.check_abort, download_size, tempfile_maker)
660                 def _read(ign):
661                     if noisy: self.log("_read immutable", level=NOISY)
662                     filenode.read(self.consumer, 0, None)
663                 self.async.addCallback(_read)
664
665         if noisy: self.log("__init__ done", level=NOISY)
666
667     def rename(self, new_parent, new_childname):
668         self.log(".rename(%r, %r)" % (new_parent, new_childname), level=OPERATIONAL)
669
670         self.parent = new_parent
671         self.childname = new_childname
672
673     def readChunk(self, offset, length):
674         request = ".readChunk(%r, %r)" % (offset, length)
675         self.log(request, level=OPERATIONAL)
676
677         if not (self.flags & FXF_READ):
678             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for reading")
679             return defer.execute(_denied)
680
681         if self.closed:
682             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
683             return defer.execute(_closed)
684
685         d = defer.Deferred()
686         def _read(ign):
687             if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
688             d2 = self.consumer.read(offset, length)
689             d2.addErrback(_convert_error, request)
690             d2.addCallbacks(eventually_callback(d), eventually_errback(d))
691             # It is correct to drop d2 here.
692             return None
693         self.async.addCallbacks(_read, eventually_errback(d))
694         d.addBoth(_convert_error, request)
695         return d
696
697     def writeChunk(self, offset, data):
698         self.log(".writeChunk(%r, <data of length %r>)" % (offset, len(data)), level=OPERATIONAL)
699
700         if not (self.flags & FXF_WRITE):
701             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
702             return defer.execute(_denied)
703
704         if self.closed:
705             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
706             return defer.execute(_closed)
707
708         self.has_changed = True
709
710         # Note that we return without waiting for the write to occur. Reads and
711         # close wait for prior writes, and will fail if any prior operation failed.
712         # This is ok because SFTP makes no guarantee that the request completes
713         # before the write. In fact it explicitly allows write errors to be delayed
714         # until close:
715         #   "One should note that on some server platforms even a close can fail.
716         #    This can happen e.g. if the server operating system caches writes,
717         #    and an error occurs while flushing cached writes during the close."
718
719         def _write(ign):
720             if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
721                                (offset, len(data), self.consumer.get_current_size()), level=NOISY)
722             # FXF_APPEND means that we should always write at the current end of file.
723             write_offset = offset
724             if self.flags & FXF_APPEND:
725                 write_offset = self.consumer.get_current_size()
726
727             self.consumer.overwrite(write_offset, data)
728             if noisy: self.log("overwrite done", level=NOISY)
729             return None
730         self.async.addCallback(_write)
731         # don't addErrback to self.async, just allow subsequent async ops to fail.
732         return defer.succeed(None)
733
734     def close(self):
735         request = ".close()"
736         self.log(request, level=OPERATIONAL)
737
738         if self.closed:
739             return defer.succeed(None)
740
741         # This means that close has been called, not that the close has succeeded.
742         self.closed = True
743
744         if not (self.flags & (FXF_WRITE | FXF_CREAT)):
745             return defer.execute(self.consumer.close)
746
747         def _close(ign):
748             d2 = defer.succeed(None)
749             if self.has_changed:
750                 d2.addCallback(lambda ign: self.consumer.when_done())
751                 if self.filenode and self.filenode.is_mutable():
752                     d2.addCallback(lambda ign: self.consumer.get_current_size())
753                     d2.addCallback(lambda size: self.consumer.read(0, size))
754                     d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
755                 else:
756                     def _add_file(ign):
757                         self.log("_add_file childname=%r" % (self.childname,), level=OPERATIONAL)
758                         u = FileHandle(self.consumer.get_file(), self.convergence)
759                         return self.parent.add_file(self.childname, u)
760                     d2.addCallback(_add_file)
761
762             d2.addCallback(lambda ign: self.consumer.close())
763             return d2
764         self.async.addCallback(_close)
765
766         d = defer.Deferred()
767         self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
768
769         def _closed(res):
770             self.close_notify(self.parent, self.childname, self)
771             return res
772         d.addBoth(_closed)
773         d.addBoth(_convert_error, request)
774         return d
775
776     def getAttrs(self):
777         request = ".getAttrs()"
778         self.log(request, level=OPERATIONAL)
779
780         if self.closed:
781             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
782             return defer.execute(_closed)
783
784         # Optimization for read-only handles, when we already know the metadata.
785         if not(self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
786             return defer.succeed(_populate_attrs(self.filenode, self.metadata))
787
788         d = defer.Deferred()
789         def _get(ign):
790             # self.filenode might be None, but that's ok.
791             attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
792             eventually_callback(d)(attrs)
793             return None
794         self.async.addCallbacks(_get, eventually_errback(d))
795         d.addBoth(_convert_error, request)
796         return d
797
798     def setAttrs(self, attrs):
799         request = ".setAttrs(attrs) %r" % (attrs,)
800         self.log(request, level=OPERATIONAL)
801
802         if not (self.flags & FXF_WRITE):
803             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
804             return defer.execute(_denied)
805
806         if self.closed:
807             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
808             return defer.execute(_closed)
809
810         if not "size" in attrs:
811             return defer.succeed(None)
812
813         size = attrs["size"]
814         if not isinstance(size, (int, long)) or size < 0:
815             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
816             return defer.execute(_bad)
817
818         d = defer.Deferred()
819         def _resize(ign):
820             self.consumer.set_current_size(size)
821             eventually_callback(d)(None)
822             return None
823         self.async.addCallbacks(_resize, eventually_errback(d))
824         d.addBoth(_convert_error, request)
825         return d
826
827
828 class StoppableList:
829     def __init__(self, items):
830         self.items = items
831     def __iter__(self):
832         for i in self.items:
833             yield i
834     def close(self):
835         pass
836
837
838 class Reason:
839     def __init__(self, value):
840         self.value = value
841
842
843 # For each immutable file that has been opened with write flags
844 # (FXF_WRITE and/or FXF_CREAT) and is still open, this maps from
845 # parent_write_uri + "/" + childname_utf8, to (list_of_ISFTPFile, open_time_utc).
846 # Updates to this dict are single-threaded.
847
848 all_open_files = {}
849
850 class SFTPUserHandler(ConchUser, PrefixingLogMixin):
851     implements(ISFTPServer)
852     def __init__(self, client, rootnode, username):
853         ConchUser.__init__(self)
854         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
855         if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
856
857         self.channelLookup["session"] = session.SSHSession
858         self.subsystemLookup["sftp"] = FileTransferServer
859
860         self._client = client
861         self._root = rootnode
862         self._username = username
863         self._convergence = client.convergence
864         self._logged_out = False
865         self._open_files = {}  # files created by this user handler and still open
866
867     def gotVersion(self, otherVersion, extData):
868         self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
869
870         # advertise the same extensions as the OpenSSH SFTP server
871         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
872         return {'extposix-rename@openssh.com': '1',
873                 'statvfs@openssh.com': '2',
874                 'fstatvfs@openssh.com': '2',
875                }
876
877     def _add_open_files(self, direntry, files_to_add):
878         if direntry:
879             if direntry in self._open_files:
880                 self._open_files[direntry] += files_to_add
881             else:
882                 self._open_files[direntry] = files_to_add
883
884             if direntry in all_open_files:
885                 (old_files, opentime) = all_open_files[direntry]
886                 all_open_files[direntry] = (old_files + files_to_add, opentime)
887             else:
888                 all_open_files[direntry] = (files_to_add, time())
889
890     def _remove_open_files(self, direntry, files_to_remove):
891         if direntry and not self._logged_out:
892             assert direntry in self._open_files, (direntry, self._open_files)
893             assert direntry in all_open_files, (direntry, all_open_files)
894
895             old_files = self._open_files[direntry]
896             new_files = [f for f in old_files if f not in files_to_remove]
897             if len(new_files) > 0:
898                 self._open_files[direntry] = new_files
899             else:
900                 del self._open_files[direntry]
901
902             (all_old_files, opentime) = all_open_files[direntry]
903             all_new_files = [f for f in all_old_files if f not in files_to_remove]
904             if len(all_new_files) > 0:
905                 all_open_files[direntry] = (all_new_files, opentime)
906             else:
907                 del all_open_files[direntry]
908
909     def _rename_open_files(self, from_parent, from_childname, to_parent, to_childname):
910         """When an direntry is renamed, any open files for that direntry are also renamed.
911         Return True if there were any open files at from_direntry."""
912
913         from_direntry = self._direntry_for(from_parent, from_childname)
914         to_direntry = self._direntry_for(to_parent, to_childname)
915
916         if from_direntry in all_open_files:
917             (from_files, opentime) = all_open_files[from_direntry]
918             del self._open_files[from_direntry]
919             del all_open_files[from_direntry]
920             for file in from_files:
921                 file.rename(to_parent, to_childname)
922             self._add_open_files(to_direntry, from_files)
923             return True
924         else:
925             return False
926
927     def _direntry_for(self, parent, childname):
928         if parent and childname:
929             rw_uri = parent.get_write_uri()
930             if rw_uri:
931                 return rw_uri + "/" + childname.encode('utf-8')
932
933         return None
934
935     def logout(self):
936         if not self._logged_out:
937             self._logged_out = True
938             for (direntry, files_at_direntry) in enumerate(self._open_files):
939                 self._remove_open_files(direntry, files_at_direntry)
940
941     def _check_abort(self):
942         return self._logged_out
943
944     def _close_notify(self, parent, childname, f):
945         self._remove_open_files(self._direntry_for(parent, childname), [f])
946
947     def _make_file(self, flags, parent=None, childname=None, filenode=None, metadata=None):
948         if noisy: self.log("._make_file(%r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r" %
949                            (flags, _repr_flags(flags), parent, childname, filenode, metadata), level=NOISY)
950
951         assert metadata is None or 'readonly' in metadata, metadata
952         writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
953
954         if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
955             return ShortReadOnlySFTPFile(filenode, metadata)
956         else:
957             direntry = None
958             if writing:
959                 direntry = self._direntry_for(parent, childname)
960
961             file = GeneralSFTPFile(self._close_notify, self._check_abort, flags, self._convergence,
962                                    parent=parent, childname=childname, filenode=filenode, metadata=metadata)
963             self._add_open_files(direntry, [file])
964             return file
965
966     def openFile(self, pathstring, flags, attrs):
967         request = ".openFile(%r, %r = %r, %r)" % (pathstring, flags, _repr_flags(flags), attrs)
968         self.log(request, level=OPERATIONAL)
969
970         # This is used for both reading and writing.
971         # First exclude invalid combinations of flags.
972
973         # /usr/bin/sftp 'get' gives us FXF_READ, while 'put' on a new file
974         # gives FXF_WRITE | FXF_CREAT | FXF_TRUNC. I'm guessing that 'put' on an
975         # existing file gives the same.
976
977         if not (flags & (FXF_READ | FXF_WRITE)):
978             raise SFTPError(FX_BAD_MESSAGE,
979                             "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
980
981         if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
982             raise SFTPError(FX_BAD_MESSAGE,
983                             "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
984
985         path = self._path_from_string(pathstring)
986         if not path:
987             raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
988
989         # The combination of flags is potentially valid. Now there are two major cases:
990         #
991         #  1. The path is specified as /uri/FILECAP, with no parent directory.
992         #     If the FILECAP is mutable and writeable, then we can open it in write-only
993         #     or read/write mode (non-exclusively), otherwise we can only open it in
994         #     read-only mode. The open should succeed immediately as long as FILECAP is
995         #     a valid known filecap that grants the required permission.
996         #
997         #  2. The path is specified relative to a parent. We find the parent dirnode and
998         #     get the child's URI and metadata if it exists. There are four subcases:
999         #       a. the child does not exist: FXF_CREAT must be set, and we must be able
1000         #          to write to the parent directory.
1001         #       b. the child exists but is not a valid known filecap: fail
1002         #       c. the child is mutable: if we are trying to open it write-only or
1003         #          read/write, then we must be able to write to the file.
1004         #       d. the child is immutable: if we are trying to open it write-only or
1005         #          read/write, then we must be able to write to the parent directory.
1006         #
1007         # To reduce latency, open succeeds as soon as these conditions are met, even
1008         # though there might be a failure in downloading the existing file or uploading
1009         # a new one.
1010         #
1011         # Note that the permission checks below are for more precise error reporting on
1012         # the open call; later operations would fail even if we did not make these checks.
1013
1014         d = self._get_root(path)
1015         def _got_root( (root, path) ):
1016             if root.is_unknown():
1017                 raise SFTPError(FX_PERMISSION_DENIED,
1018                                 "cannot open an unknown cap (or child of an unknown directory). "
1019                                 "Upgrading the gateway to a later Tahoe-LAFS version may help")
1020             if not path:
1021                 # case 1
1022                 if noisy: self.log("case 1: root = %r, path[:-1] = %r" % (root, path[:-1]), level=NOISY)
1023                 if not IFileNode.providedBy(root):
1024                     raise SFTPError(FX_PERMISSION_DENIED,
1025                                     "cannot open a directory cap")
1026                 if (flags & FXF_WRITE) and root.is_readonly():
1027                     raise SFTPError(FX_PERMISSION_DENIED,
1028                                     "cannot write to a non-writeable filecap without a parent directory")
1029                 if flags & FXF_EXCL:
1030                     raise SFTPError(FX_FAILURE,
1031                                     "cannot create a file exclusively when it already exists")
1032
1033                 # The file does not need to be added to all_open_files, because it is not
1034                 # associated with a directory entry that needs to be updated.
1035
1036                 return self._make_file(flags, filenode=root)
1037             else:
1038                 # case 2
1039                 childname = path[-1]
1040                 if noisy: self.log("case 2: root = %r, childname = %r, path[:-1] = %r" %
1041                                    (root, childname, path[:-1]), level=NOISY)
1042                 d2 = root.get_child_at_path(path[:-1])
1043                 def _got_parent(parent):
1044                     if noisy: self.log("_got_parent(%r)" % (parent,), level=NOISY)
1045                     if parent.is_unknown():
1046                         raise SFTPError(FX_PERMISSION_DENIED,
1047                                         "cannot open an unknown cap (or child of an unknown directory). "
1048                                         "Upgrading the gateway to a later Tahoe-LAFS version may help")
1049
1050                     parent_readonly = parent.is_readonly()
1051                     d3 = defer.succeed(None)
1052                     if flags & FXF_EXCL:
1053                         # FXF_EXCL means that the link to the file (not the file itself) must
1054                         # be created atomically wrt updates by this storage client.
1055                         # That is, we need to create the link before returning success to the
1056                         # SFTP open request (and not just on close, as would normally be the
1057                         # case). We make the link initially point to a zero-length LIT file,
1058                         # which is consistent with what might happen on a POSIX filesystem.
1059
1060                         if parent_readonly:
1061                             raise SFTPError(FX_FAILURE,
1062                                             "cannot create a file exclusively when the parent directory is read-only")
1063
1064                         # 'overwrite=False' ensures failure if the link already exists.
1065                         # FIXME: should use a single call to set_uri and return (child, metadata) (#1035)
1066
1067                         zero_length_lit = "URI:LIT:"
1068                         if noisy: self.log("%r.set_uri(%r, None, readcap=%r, overwrite=False)" %
1069                                            (parent, zero_length_lit, childname), level=NOISY)
1070                         d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit, overwrite=False))
1071                         def _seturi_done(child):
1072                             if noisy: self.log("%r.get_metadata_for(%r)" % (parent, childname), level=NOISY)
1073                             d4 = parent.get_metadata_for(childname)
1074                             d4.addCallback(lambda metadata: (child, metadata))
1075                             return d4
1076                         d3.addCallback(_seturi_done)
1077                     else:
1078                         if noisy: self.log("%r.get_child_and_metadata(%r)" % (parent, childname), level=NOISY)
1079                         d3.addCallback(lambda ign: parent.get_child_and_metadata(childname))
1080
1081                     def _got_child( (filenode, metadata) ):
1082                         if noisy: self.log("_got_child( (%r, %r) )" % (filenode, metadata), level=NOISY)
1083
1084                         if filenode.is_unknown():
1085                             raise SFTPError(FX_PERMISSION_DENIED,
1086                                             "cannot open an unknown cap. Upgrading the gateway "
1087                                             "to a later Tahoe-LAFS version may help")
1088                         if not IFileNode.providedBy(filenode):
1089                             raise SFTPError(FX_PERMISSION_DENIED,
1090                                             "cannot open a directory as if it were a file")
1091                         if (flags & FXF_WRITE) and filenode.is_mutable() and filenode.is_readonly():
1092                             raise SFTPError(FX_PERMISSION_DENIED,
1093                                             "cannot open a read-only mutable file for writing")
1094                         if (flags & FXF_WRITE) and parent_readonly:
1095                             raise SFTPError(FX_PERMISSION_DENIED,
1096                                             "cannot open a file for writing when the parent directory is read-only")
1097
1098                         metadata['readonly'] = _is_readonly(parent_readonly, filenode)
1099                         return self._make_file(flags, parent=parent, childname=childname, filenode=filenode, metadata=metadata)
1100                     def _no_child(f):
1101                         if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
1102                         f.trap(NoSuchChildError)
1103
1104                         if not (flags & FXF_CREAT):
1105                             raise SFTPError(FX_NO_SUCH_FILE,
1106                                             "the file does not exist, and was not opened with the creation (CREAT) flag")
1107                         if parent_readonly:
1108                             raise SFTPError(FX_PERMISSION_DENIED,
1109                                             "cannot create a file when the parent directory is read-only")
1110
1111                         return self._make_file(flags, parent=parent, childname=childname)
1112                     d3.addCallbacks(_got_child, _no_child)
1113                     return d3
1114
1115                 d2.addCallback(_got_parent)
1116                 return d2
1117
1118         d.addCallback(_got_root)
1119         d.addBoth(_convert_error, request)
1120         return d
1121
1122     def renameFile(self, from_pathstring, to_pathstring, overwrite=False):
1123         request = ".renameFile(%r, %r)" % (from_pathstring, to_pathstring)
1124         self.log(request, level=OPERATIONAL)
1125
1126         from_path = self._path_from_string(from_pathstring)
1127         to_path = self._path_from_string(to_pathstring)
1128
1129         # the target directory must already exist
1130         d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
1131                                         self._get_parent_or_node(to_path)])
1132         def _got( (from_pair, to_pair) ):
1133             if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r)" %
1134                                (from_pair, to_pair, from_pathstring, to_pathstring), level=NOISY)
1135             (from_parent, from_childname) = from_pair
1136             (to_parent, to_childname) = to_pair
1137             
1138             if from_childname is None:
1139                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename a source object specified by URI")
1140             if to_childname is None:
1141                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename to a destination specified by URI")
1142
1143             # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
1144             # "It is an error if there already exists a file with the name specified
1145             #  by newpath."
1146             # For the standard SSH_FXP_RENAME operation, overwrite=False.
1147             # We also support the extposix-rename@openssh.com extension, which uses overwrite=True.
1148
1149             # FIXME: use move_child_to_path to avoid possible data loss due to #943
1150             #d2 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
1151
1152             d2 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
1153             def _check(err):
1154                 if noisy: self.log("_check(%r) in .renameFile(%r, %r)" %
1155                                    (err, from_pathstring, to_pathstring), level=NOISY)
1156
1157                 if not isinstance(err, Failure) or err.check(NoSuchChildError):
1158                     # If there are open files to be written at the 'from' direntry, then ensure
1159                     # they will now be written at the 'to' direntry instead.
1160                     if noisy: self.log("checking open files:\nself._open_files = %r\nall_open_files = %r" %
1161                                        (self._open_files, all_open_files), level=NOISY)
1162                     if self._rename_open_files(from_parent, from_childname, to_parent, to_childname):
1163                         # suppress the NoSuchChildError if any open files were renamed
1164                         if noisy: self.log("after renaming:\nself._open_files = %r\nall_open_files = %r" %
1165                                            (self._open_files, all_open_files), level=NOISY)
1166                         return None
1167                 elif err.check(ExistingChildError):
1168                     # OpenSSH SFTP server returns FX_PERMISSION_DENIED
1169                     raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_pathstring)
1170
1171                 return err
1172             d2.addBoth(_check)
1173             return d2
1174         d.addCallback(_got)
1175         d.addBoth(_convert_error, request)
1176         return d
1177
1178     def makeDirectory(self, pathstring, attrs):
1179         request = ".makeDirectory(%r, %r)" % (pathstring, attrs)
1180         self.log(request, level=OPERATIONAL)
1181
1182         path = self._path_from_string(pathstring)
1183         metadata = self._attrs_to_metadata(attrs)
1184         d = self._get_root(path)
1185         d.addCallback(lambda (root, path):
1186                       self._get_or_create_directories(root, path, metadata))
1187         d.addBoth(_convert_error, request)
1188         return d
1189
1190     def _get_or_create_directories(self, node, path, metadata):
1191         if not IDirectoryNode.providedBy(node):
1192             # TODO: provide the name of the blocking file in the error message.
1193             def _blocked(): raise SFTPError(FX_FAILURE, "cannot create directory because there "
1194                                                         "is a file in the way") # close enough
1195             return defer.execute(_blocked)
1196
1197         if not path:
1198             return defer.succeed(node)
1199         d = node.get(path[0])
1200         def _maybe_create(f):
1201             f.trap(NoSuchChildError)
1202             return node.create_subdirectory(path[0])
1203         d.addErrback(_maybe_create)
1204         d.addCallback(self._get_or_create_directories, path[1:], metadata)
1205         return d
1206
1207     def removeFile(self, pathstring):
1208         request = ".removeFile(%r)" % (pathstring,)
1209         self.log(request, level=OPERATIONAL)
1210
1211         path = self._path_from_string(pathstring)
1212         d = self._remove_object(path, must_be_file=True)
1213         d.addBoth(_convert_error, request)
1214         return d
1215
1216     def removeDirectory(self, pathstring):
1217         request = ".removeDirectory(%r)" % (pathstring,)
1218         self.log(request, level=OPERATIONAL)
1219
1220         path = self._path_from_string(pathstring)
1221         d = self._remove_object(path, must_be_directory=True)
1222         d.addBoth(_convert_error, request)
1223         return d
1224
1225     def _remove_object(self, path, must_be_directory=False, must_be_file=False):
1226         d = defer.maybeDeferred(self._get_parent_or_node, path)
1227         def _got_parent( (parent, childname) ):
1228             # FIXME (minor): there is a race condition between the 'get' and 'delete',
1229             # so it is possible that the must_be_directory or must_be_file restrictions
1230             # might not be enforced correctly if the type has just changed.
1231
1232             if childname is None:
1233                 raise SFTPError(FX_NO_SUCH_FILE, "cannot delete an object specified by URI")
1234
1235             d2 = parent.get(childname)
1236             def _got_child(child):
1237                 # Unknown children can be removed by either removeFile or removeDirectory.
1238                 if must_be_directory and IFileNode.providedBy(child):
1239                     raise SFTPError(FX_PERMISSION_DENIED, "rmdir called on a file")
1240                 if must_be_file and IDirectoryNode.providedBy(child):
1241                     raise SFTPError(FX_PERMISSION_DENIED, "rmfile called on a directory")
1242                 return parent.delete(childname)
1243             d2.addCallback(_got_child)
1244             return d2
1245         d.addCallback(_got_parent)
1246         return d
1247
1248     def openDirectory(self, pathstring):
1249         request = ".openDirectory(%r)" % (pathstring,)
1250         self.log(request, level=OPERATIONAL)
1251
1252         path = self._path_from_string(pathstring)
1253         d = self._get_parent_or_node(path)
1254         def _got_parent_or_node( (parent_or_node, childname) ):
1255             if noisy: self.log("_got_parent_or_node( (%r, %r) ) in openDirectory(%r)" %
1256                                (parent_or_node, childname, pathstring), level=NOISY)
1257             if childname is None:
1258                 return parent_or_node
1259             else:
1260                 return parent_or_node.get(childname)
1261         d.addCallback(_got_parent_or_node)
1262         def _list(dirnode):
1263             if dirnode.is_unknown():
1264                 raise SFTPError(FX_PERMISSION_DENIED,
1265                                 "cannot list an unknown cap as a directory. Upgrading the gateway "
1266                                 "to a later Tahoe-LAFS version may help")
1267             if not IDirectoryNode.providedBy(dirnode):
1268                 raise SFTPError(FX_PERMISSION_DENIED,
1269                                 "cannot list a file as if it were a directory")
1270
1271             d2 = dirnode.list()
1272             def _render(children):
1273                 parent_readonly = dirnode.is_readonly()
1274                 results = []
1275                 for filename, (child, metadata) in children.iteritems():
1276                     # The file size may be cached or absent.
1277                     metadata['readonly'] = _is_readonly(parent_readonly, child)
1278                     attrs = _populate_attrs(child, metadata)
1279                     filename_utf8 = filename.encode('utf-8')
1280                     longname = _lsLine(filename_utf8, attrs)
1281                     results.append( (filename_utf8, longname, attrs) )
1282                 return StoppableList(results)
1283             d2.addCallback(_render)
1284             return d2
1285         d.addCallback(_list)
1286         d.addBoth(_convert_error, request)
1287         return d
1288
1289     def getAttrs(self, pathstring, followLinks):
1290         request = ".getAttrs(%r, followLinks=%r)" % (pathstring, followLinks)
1291         self.log(request, level=OPERATIONAL)
1292
1293         # When asked about a specific file, report its current size.
1294         # TODO: the modification time for a mutable file should be
1295         # reported as the update time of the best version. But that
1296         # information isn't currently stored in mutable shares, I think.
1297
1298         path = self._path_from_string(pathstring)
1299         d = self._get_parent_or_node(path)
1300         def _got_parent_or_node( (parent_or_node, childname) ):
1301             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1302             if childname is None:
1303                 node = parent_or_node
1304                 d2 = node.get_current_size()
1305                 d2.addCallback(lambda size:
1306                                _populate_attrs(node, {'readonly': node.is_unknown() or node.is_readonly()}, size=size))
1307                 return d2
1308             else:
1309                 parent = parent_or_node
1310                 d2 = parent.get_child_and_metadata_at_path([childname])
1311                 def _got( (child, metadata) ):
1312                     assert IDirectoryNode.providedBy(parent), parent
1313                     metadata['readonly'] = _is_readonly(parent.is_readonly(), child)
1314                     d3 = child.get_current_size()
1315                     d3.addCallback(lambda size: _populate_attrs(child, metadata, size=size))
1316                     return d3
1317                 def _nosuch(err):
1318                     err.trap(NoSuchChildError)
1319                     direntry = self._direntry_for(parent, childname)
1320                     if direntry in all_open_files:
1321                         (files, opentime) = all_open_files[direntry]
1322                         # A file that has been opened for writing necessarily has permissions rw-rw-rw-.
1323                         return {'permissions': S_IFREG | 0666,
1324                                 'size': 0,
1325                                 'createtime': opentime,
1326                                 'ctime': opentime,
1327                                 'mtime': opentime,
1328                                 'atime': opentime,
1329                                }
1330                     return err
1331                 d2.addCallbacks(_got, _nosuch)
1332                 return d2
1333         d.addCallback(_got_parent_or_node)
1334         d.addBoth(_convert_error, request)
1335         return d
1336
1337     def setAttrs(self, pathstring, attrs):
1338         self.log(".setAttrs(%r, %r)" % (pathstring, attrs), level=OPERATIONAL)
1339
1340         if "size" in attrs:
1341             # this would require us to download and re-upload the truncated/extended
1342             # file contents
1343             def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute")
1344             return defer.execute(_unsupported)
1345         return defer.succeed(None)
1346
1347     def readLink(self, pathstring):
1348         self.log(".readLink(%r)" % (pathstring,), level=OPERATIONAL)
1349
1350         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "readLink")
1351         return defer.execute(_unsupported)
1352
1353     def makeLink(self, linkPathstring, targetPathstring):
1354         self.log(".makeLink(%r, %r)" % (linkPathstring, targetPathstring), level=OPERATIONAL)
1355
1356         # If this is implemented, note the reversal of arguments described in point 7 of
1357         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>.
1358
1359         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "makeLink")
1360         return defer.execute(_unsupported)
1361
1362     def extendedRequest(self, extensionName, extensionData):
1363         self.log(".extendedRequest(%r, <data of length %r>)" % (extensionName, len(extensionData)), level=OPERATIONAL)
1364
1365         # We implement the three main OpenSSH SFTP extensions; see
1366         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1367
1368         if extensionName == 'extposix-rename@openssh.com':
1369             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "could not parse extposix-rename@openssh.com request")
1370
1371             (fromPathLen,) = struct.unpack('>L', extensionData[0:4])
1372             if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
1373
1374             (toPathLen,) = struct.unpack('>L', extensionData[(4 + fromPathLen):(8 + fromPathLen)])
1375             if 8 + fromPathLen + toPathLen != len(extensionData): return defer.execute(_bad)
1376
1377             fromPathstring = extensionData[4:(4 + fromPathLen)]
1378             toPathstring = extensionData[(8 + fromPathLen):]
1379             return self.renameFile(fromPathstring, toPathstring, overwrite=True)
1380
1381         if extensionName == 'statvfs@openssh.com' or extensionName == 'fstatvfs@openssh.com':
1382             return defer.succeed(struct.pack('>11Q',
1383                 1024,         # uint64  f_bsize     /* file system block size */
1384                 1024,         # uint64  f_frsize    /* fundamental fs block size */
1385                 628318530,    # uint64  f_blocks    /* number of blocks (unit f_frsize) */
1386                 314159265,    # uint64  f_bfree     /* free blocks in file system */
1387                 314159265,    # uint64  f_bavail    /* free blocks for non-root */
1388                 200000000,    # uint64  f_files     /* total file inodes */
1389                 100000000,    # uint64  f_ffree     /* free file inodes */
1390                 100000000,    # uint64  f_favail    /* free file inodes for non-root */
1391                 0x1AF5,       # uint64  f_fsid      /* file system id */
1392                 2,            # uint64  f_flag      /* bit mask = ST_NOSUID; not ST_RDONLY */
1393                 65535,        # uint64  f_namemax   /* maximum filename length */
1394                 ))
1395
1396         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "unsupported %r request <data of length %r>" %
1397                                                                (extensionName, len(extensionData)))
1398         return defer.execute(_unsupported)
1399
1400     def realPath(self, pathstring):
1401         self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
1402
1403         path_utf8 = [p.encode('utf-8') for p in self._path_from_string(pathstring)]
1404         return "/" + "/".join(path_utf8)
1405
1406     def _path_from_string(self, pathstring):
1407         if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
1408
1409         # The home directory is the root directory.
1410         pathstring = pathstring.strip("/")
1411         if pathstring == "" or pathstring == ".":
1412             path_utf8 = []
1413         else:
1414             path_utf8 = pathstring.split("/")
1415
1416         # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.2>
1417         # "Servers SHOULD interpret a path name component ".." as referring to
1418         #  the parent directory, and "." as referring to the current directory."
1419         path = []
1420         for p_utf8 in path_utf8:
1421             if p_utf8 == "..":
1422                 # ignore excess .. components at the root
1423                 if len(path) > 0:
1424                     path = path[:-1]
1425             elif p_utf8 != ".":
1426                 try:
1427                     p = p_utf8.decode('utf-8', 'strict')
1428                 except UnicodeError:
1429                     raise SFTPError(FX_NO_SUCH_FILE, "path could not be decoded as UTF-8")
1430                 path.append(p)
1431
1432         if noisy: self.log(" PATH %r" % (path,), level=NOISY)
1433         return path
1434
1435     def _get_root(self, path):
1436         # return Deferred (root, remaining_path)
1437         if path and path[0] == u"uri":
1438             d = defer.maybeDeferred(self._client.create_node_from_uri, path[1].encode('utf-8'))
1439             d.addCallback(lambda root: (root, path[2:]))
1440         else:
1441             d = defer.succeed((self._root, path))
1442         return d
1443
1444     def _get_parent_or_node(self, path):
1445         # return Deferred (parent, childname) or (node, None)
1446         d = self._get_root(path)
1447         def _got_root( (root, remaining_path) ):
1448             if not remaining_path:
1449                 return (root, None)
1450             else:
1451                 d2 = root.get_child_at_path(remaining_path[:-1])
1452                 d2.addCallback(lambda parent: (parent, remaining_path[-1]))
1453                 return d2
1454         d.addCallback(_got_root)
1455         return d
1456
1457     def _attrs_to_metadata(self, attrs):
1458         metadata = {}
1459
1460         for key in attrs:
1461             if key == "mtime" or key == "ctime" or key == "createtime":
1462                 metadata[key] = long(attrs[key])
1463             elif key.startswith("ext_"):
1464                 metadata[key] = str(attrs[key])
1465
1466         return metadata
1467
1468
1469 class SFTPUser(ConchUser, PrefixingLogMixin):
1470     implements(ISession)
1471     def __init__(self, check_abort, client, rootnode, username, convergence):
1472         ConchUser.__init__(self)
1473         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
1474
1475         self.channelLookup["session"] = session.SSHSession
1476         self.subsystemLookup["sftp"] = FileTransferServer
1477
1478         self.check_abort = check_abort
1479         self.client = client
1480         self.root = rootnode
1481         self.username = username
1482         self.convergence = convergence
1483
1484     def getPty(self, terminal, windowSize, attrs):
1485         self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
1486         raise NotImplementedError
1487
1488     def openShell(self, protocol):
1489         self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
1490         raise NotImplementedError
1491
1492     def execCommand(self, protocol, cmd):
1493         self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
1494         raise NotImplementedError
1495
1496     def windowChanged(self, newWindowSize):
1497         self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
1498
1499     def eofReceived():
1500         self.log(".eofReceived()", level=OPERATIONAL)
1501
1502     def closed(self):
1503         self.log(".closed()", level=OPERATIONAL)
1504
1505
1506 # if you have an SFTPUser, and you want something that provides ISFTPServer,
1507 # then you get SFTPHandler(user)
1508 components.registerAdapter(SFTPHandler, SFTPUser, ISFTPServer)
1509
1510 from auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
1511
1512 class Dispatcher:
1513     implements(portal.IRealm)
1514     def __init__(self, client):
1515         self._client = client
1516
1517     def requestAvatar(self, avatarID, mind, interface):
1518         assert interface == IConchUser, interface
1519         rootnode = self._client.create_node_from_uri(avatarID.rootcap)
1520         handler = SFTPUserHandler(self._client, rootnode, avatarID.username)
1521         return (interface, handler, handler.logout)
1522
1523
1524 class SFTPServer(service.MultiService):
1525     def __init__(self, client, accountfile, accounturl,
1526                  sftp_portstr, pubkey_file, privkey_file):
1527         service.MultiService.__init__(self)
1528
1529         r = Dispatcher(client)
1530         p = portal.Portal(r)
1531
1532         if accountfile:
1533             c = AccountFileChecker(self, accountfile)
1534             p.registerChecker(c)
1535         if accounturl:
1536             c = AccountURLChecker(self, accounturl)
1537             p.registerChecker(c)
1538         if not accountfile and not accounturl:
1539             # we could leave this anonymous, with just the /uri/CAP form
1540             raise NeedRootcapLookupScheme("must provide an account file or URL")
1541
1542         pubkey = keys.Key.fromFile(pubkey_file)
1543         privkey = keys.Key.fromFile(privkey_file)
1544         class SSHFactory(factory.SSHFactory):
1545             publicKeys = {pubkey.sshType(): pubkey}
1546             privateKeys = {privkey.sshType(): privkey}
1547             def getPrimes(self):
1548                 try:
1549                     # if present, this enables diffie-hellman-group-exchange
1550                     return primes.parseModuliFile("/etc/ssh/moduli")
1551                 except IOError:
1552                     return None
1553
1554         f = SSHFactory()
1555         f.portal = p
1556
1557         s = strports.service(sftp_portstr, f)
1558         s.setServiceParent(self)