]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/sftpd.py
SFTP: avoid race condition where .write could be called on an OverwriteableFileConsum...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / sftpd.py
1
2 import os, tempfile, heapq, binascii, traceback, array, stat, struct
3 from stat import S_IFREG, S_IFDIR
4 from time import time, strftime, localtime
5
6 from zope.interface import implements
7 from twisted.python import components
8 from twisted.application import service, strports
9 from twisted.conch.ssh import factory, keys, session
10 from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
11      FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
12      FX_BAD_MESSAGE, FX_FAILURE
13 from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
14      FXF_CREAT, FXF_TRUNC, FXF_EXCL
15 from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser, ISession
16 from twisted.conch.avatar import ConchUser
17 from twisted.conch.openssh_compat import primes
18 from twisted.cred import portal
19 from twisted.internet.error import ProcessDone, ProcessTerminated
20 from twisted.python.failure import Failure
21 from twisted.internet.interfaces import ITransport
22
23 from twisted.internet import defer
24 from twisted.internet.interfaces import IFinishableConsumer
25 from foolscap.api import eventually
26 from allmydata.util import deferredutil
27
28 from allmydata.util.consumer import download_to_data
29 from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
30      NoSuchChildError
31 from allmydata.mutable.common import NotWriteableError
32 from allmydata.immutable.upload import FileHandle
33
34 from pycryptopp.cipher.aes import AES
35
36 # twisted.conch.ssh.filetransfer generates this warning, but not when it is imported,
37 # only on an error.
38 import warnings
39 warnings.filterwarnings("ignore", category=DeprecationWarning,
40     message="BaseException.message has been deprecated as of Python 2.6",
41     module=".*filetransfer", append=True)
42
43 noisy = True
44 use_foolscap_logging = True
45
46 from allmydata.util.log import NOISY, OPERATIONAL, \
47     msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
48
49 if use_foolscap_logging:
50     (logmsg, logerr, PrefixingLogMixin) = (_msg, _err, _PrefixingLogMixin)
51 else:  # pragma: no cover
52     def logmsg(s, level=None):
53         print s
54     def logerr(s, level=None):
55         print s
56     class PrefixingLogMixin:
57         def __init__(self, facility=None):
58             pass
59         def log(self, s, level=None):
60             print s
61
62
63 def eventually_callback(d):
64     return lambda res: eventually(d.callback, res)
65
66 def eventually_errback(d):
67     return lambda err: eventually(d.errback, err)
68
69
70 def _utf8(x):
71     if isinstance(x, unicode):
72         return x.encode('utf-8')
73     if isinstance(x, str):
74         return x
75     return repr(x)
76
77
78 def _convert_error(res, request):
79     if not isinstance(res, Failure):
80         logged_res = res
81         if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
82         logmsg("SUCCESS %r %r" % (request, logged_res,), level=OPERATIONAL)
83         return res
84
85     err = res
86     logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
87     try:
88         if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
89     except:
90         pass
91
92     # The message argument to SFTPError must not reveal information that
93     # might compromise anonymity.
94
95     if err.check(SFTPError):
96         # original raiser of SFTPError has responsibility to ensure anonymity
97         raise err
98     if err.check(NoSuchChildError):
99         childname = _utf8(err.value.args[0])
100         raise SFTPError(FX_NO_SUCH_FILE, childname)
101     if err.check(NotWriteableError):
102         msg = _utf8(err.value.args[0])
103         raise SFTPError(FX_PERMISSION_DENIED, msg)
104     if err.check(ExistingChildError):
105         # Versions of SFTP after v3 (which is what twisted.conch implements)
106         # define a specific error code for this case: FX_FILE_ALREADY_EXISTS.
107         # However v3 doesn't; instead, other servers such as sshd return
108         # FX_FAILURE. The gvfs SFTP backend, for example, depends on this
109         # to translate the error to the equivalent of POSIX EEXIST, which is
110         # necessary for some picky programs (such as gedit).
111         msg = _utf8(err.value.args[0])
112         raise SFTPError(FX_FAILURE, msg)
113     if err.check(NotImplementedError):
114         raise SFTPError(FX_OP_UNSUPPORTED, _utf8(err.value))
115     if err.check(EOFError):
116         raise SFTPError(FX_EOF, "end of file reached")
117     if err.check(defer.FirstError):
118         _convert_error(err.value.subFailure, request)
119
120     # We assume that the error message is not anonymity-sensitive.
121     raise SFTPError(FX_FAILURE, _utf8(err.value))
122
123
124 def _repr_flags(flags):
125     return "|".join([f for f in
126                      [(flags & FXF_READ)   and "FXF_READ"   or None,
127                       (flags & FXF_WRITE)  and "FXF_WRITE"  or None,
128                       (flags & FXF_APPEND) and "FXF_APPEND" or None,
129                       (flags & FXF_CREAT)  and "FXF_CREAT"  or None,
130                       (flags & FXF_TRUNC)  and "FXF_TRUNC"  or None,
131                       (flags & FXF_EXCL)   and "FXF_EXCL"   or None,
132                      ]
133                      if f])
134
135
136 def _lsLine(name, attrs):
137     st_uid = "tahoe"
138     st_gid = "tahoe"
139     st_mtime = attrs.get("mtime", 0)
140     st_mode = attrs["permissions"]
141     # TODO: check that clients are okay with this being a "?".
142     # (They should be because the longname is intended for human
143     # consumption.)
144     st_size = attrs.get("size", "?")
145     # We don't know how many links there really are to this object.
146     st_nlink = 1
147
148     # Based on <http://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
149     # We can't call the version in Twisted because we might have a version earlier than
150     # <http://twistedmatrix.com/trac/changeset/25412> (released in Twisted 8.2).
151
152     mode = st_mode
153     perms = array.array('c', '-'*10)
154     ft = stat.S_IFMT(mode)
155     if   stat.S_ISDIR(ft):  perms[0] = 'd'
156     elif stat.S_ISCHR(ft):  perms[0] = 'c'
157     elif stat.S_ISBLK(ft):  perms[0] = 'b'
158     elif stat.S_ISREG(ft):  perms[0] = '-'
159     elif stat.S_ISFIFO(ft): perms[0] = 'f'
160     elif stat.S_ISLNK(ft):  perms[0] = 'l'
161     elif stat.S_ISSOCK(ft): perms[0] = 's'
162     else: perms[0] = '?'
163     # user
164     if mode&stat.S_IRUSR: perms[1] = 'r'
165     if mode&stat.S_IWUSR: perms[2] = 'w'
166     if mode&stat.S_IXUSR: perms[3] = 'x'
167     # group
168     if mode&stat.S_IRGRP: perms[4] = 'r'
169     if mode&stat.S_IWGRP: perms[5] = 'w'
170     if mode&stat.S_IXGRP: perms[6] = 'x'
171     # other
172     if mode&stat.S_IROTH: perms[7] = 'r'
173     if mode&stat.S_IWOTH: perms[8] = 'w'
174     if mode&stat.S_IXOTH: perms[9] = 'x'
175     # suid/sgid never set
176
177     l = perms.tostring()
178     l += str(st_nlink).rjust(5) + ' '
179     un = str(st_uid)
180     l += un.ljust(9)
181     gr = str(st_gid)
182     l += gr.ljust(9)
183     sz = str(st_size)
184     l += sz.rjust(8)
185     l += ' '
186     sixmo = 60 * 60 * 24 * 7 * 26
187     if st_mtime + sixmo < time(): # last edited more than 6mo ago
188         l += strftime("%b %d  %Y ", localtime(st_mtime))
189     else:
190         l += strftime("%b %d %H:%M ", localtime(st_mtime))
191     l += name
192     return l
193
194
195 def _is_readonly(parent_readonly, child):
196     """Whether child should be listed as having read-only permissions in parent."""
197
198     if child.is_unknown():
199         return True
200     elif child.is_mutable():
201         return child.is_readonly()
202     else:
203         return parent_readonly
204
205
206 def _populate_attrs(childnode, metadata, size=None):
207     attrs = {}
208
209     # The permissions must have the S_IFDIR (040000) or S_IFREG (0100000)
210     # bits, otherwise the client may refuse to open a directory.
211     # Also, sshfs run as a non-root user requires files and directories
212     # to be world-readable/writeable.
213     #
214     # Directories and unknown nodes have no size, and SFTP doesn't
215     # require us to make one up.
216     #
217     # childnode might be None, meaning that the file doesn't exist yet,
218     # but we're going to write it later.
219
220     if childnode and childnode.is_unknown():
221         perms = 0
222     elif childnode and IDirectoryNode.providedBy(childnode):
223         perms = S_IFDIR | 0777
224     else:
225         # For files, omit the size if we don't immediately know it.
226         if childnode and size is None:
227             size = childnode.get_size()
228         if size is not None:
229             assert isinstance(size, (int, long)) and not isinstance(size, bool), repr(size)
230             attrs['size'] = size
231         perms = S_IFREG | 0666
232
233     if metadata:
234         assert 'readonly' in metadata, metadata
235         if metadata['readonly']:
236             perms &= S_IFDIR | S_IFREG | 0555  # clear 'w' bits
237
238         # see webapi.txt for what these times mean
239         if 'linkmotime' in metadata.get('tahoe', {}):
240             attrs['mtime'] = int(metadata['tahoe']['linkmotime'])
241         elif 'mtime' in metadata:
242             # We would prefer to omit atime, but SFTP version 3 can only
243             # accept mtime if atime is also set.
244             attrs['mtime'] = int(metadata['mtime'])
245             attrs['atime'] = attrs['mtime']
246
247         if 'linkcrtime' in metadata.get('tahoe', {}):
248             attrs['createtime'] = int(metadata['tahoe']['linkcrtime'])
249
250         if 'ctime' in metadata:
251             attrs['ctime'] = int(metadata['ctime'])
252
253     attrs['permissions'] = perms
254
255     # twisted.conch.ssh.filetransfer only implements SFTP version 3,
256     # which doesn't include SSH_FILEXFER_ATTR_FLAGS.
257
258     return attrs
259
260
261 class EncryptedTemporaryFile(PrefixingLogMixin):
262     # not implemented: next, readline, readlines, xreadlines, writelines
263
264     def __init__(self):
265         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
266         self.file = tempfile.TemporaryFile()
267         self.key = os.urandom(16)  # AES-128
268
269     def _crypt(self, offset, data):
270         # FIXME: use random-access AES (pycryptopp ticket #18)
271         offset_big = offset // 16
272         offset_small = offset % 16
273         iv = binascii.unhexlify("%032x" % offset_big)
274         cipher = AES(self.key, iv=iv)
275         cipher.process("\x00"*offset_small)
276         return cipher.process(data)
277
278     def close(self):
279         self.file.close()
280
281     def flush(self):
282         self.file.flush()
283
284     def seek(self, offset, whence=os.SEEK_SET):
285         if noisy: self.log(".seek(%r, %r)" % (offset, whence), level=NOISY)
286         self.file.seek(offset, whence)
287
288     def tell(self):
289         offset = self.file.tell()
290         if noisy: self.log(".tell() = %r" % (offset,), level=NOISY)
291         return offset
292
293     def read(self, size=-1):
294         if noisy: self.log(".read(%r)" % (size,), level=NOISY)
295         index = self.file.tell()
296         ciphertext = self.file.read(size)
297         plaintext = self._crypt(index, ciphertext)
298         return plaintext
299
300     def write(self, plaintext):
301         if noisy: self.log(".write(<data of length %r>)" % (len(plaintext),), level=NOISY)
302         index = self.file.tell()
303         ciphertext = self._crypt(index, plaintext)
304         self.file.write(ciphertext)
305
306     def truncate(self, newsize):
307         if noisy: self.log(".truncate(%r)" % (newsize,), level=NOISY)
308         self.file.truncate(newsize)
309
310
311 class OverwriteableFileConsumer(PrefixingLogMixin):
312     implements(IFinishableConsumer)
313     """I act both as a consumer for the download of the original file contents, and as a
314     wrapper for a temporary file that records the downloaded data and any overwrites.
315     I use a priority queue to keep track of which regions of the file have been overwritten
316     but not yet downloaded, so that the download does not clobber overwritten data.
317     I use another priority queue to record milestones at which to make callbacks
318     indicating that a given number of bytes have been downloaded.
319
320     The temporary file reflects the contents of the file that I represent, except that:
321      - regions that have neither been downloaded nor overwritten, if present,
322        contain zeroes.
323      - the temporary file may be shorter than the represented file (it is never longer).
324        The latter's current size is stored in self.current_size.
325
326     This abstraction is mostly independent of SFTP. Consider moving it, if it is found
327     useful for other frontends."""
328
329     def __init__(self, check_abort, download_size, tempfile_maker):
330         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
331         if noisy: self.log(".__init__(%r, %r, %r)" % (check_abort, download_size, tempfile_maker), level=NOISY)
332         self.check_abort = check_abort
333         self.download_size = download_size
334         self.current_size = download_size
335         self.f = tempfile_maker()
336         self.downloaded = 0
337         self.milestones = []  # empty heap of (offset, d)
338         self.overwrites = []  # empty heap of (start, end)
339         self.is_closed = False
340         self.done = self.when_reached(download_size)  # adds a milestone
341         self.is_done = False
342         def _signal_done(ign):
343             if noisy: self.log("DONE", level=NOISY)
344             self.is_done = True
345         self.done.addCallback(_signal_done)
346         self.producer = None
347
348     def get_file(self):
349         return self.f
350
351     def get_current_size(self):
352         return self.current_size
353
354     def set_current_size(self, size):
355         if noisy: self.log(".set_current_size(%r), current_size = %r, downloaded = %r" %
356                            (size, self.current_size, self.downloaded), level=NOISY)
357         if size < self.current_size or size < self.downloaded:
358             self.f.truncate(size)
359         self.current_size = size
360         if size < self.download_size:
361             self.download_size = size
362         if self.downloaded >= self.download_size:
363             self.finish()
364
365     def registerProducer(self, p, streaming):
366         if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
367         self.producer = p
368         if streaming:
369             # call resumeProducing once to start things off
370             p.resumeProducing()
371         else:
372             def _iterate():
373                 if not self.is_done:
374                     p.resumeProducing()
375                     eventually(_iterate)
376             _iterate()
377
378     def write(self, data):
379         if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
380         if self.is_closed:
381             return
382         if self.check_abort():
383             self.close()
384             return
385
386         if self.downloaded >= self.download_size:
387             return
388
389         next_downloaded = self.downloaded + len(data)
390         if next_downloaded > self.download_size:
391             data = data[:(self.download_size - self.downloaded)]
392
393         while len(self.overwrites) > 0:
394             (start, end) = self.overwrites[0]
395             if start >= next_downloaded:
396                 # This and all remaining overwrites are after the data we just downloaded.
397                 break
398             if start > self.downloaded:
399                 # The data we just downloaded has been partially overwritten.
400                 # Write the prefix of it that precedes the overwritten region.
401                 self.f.seek(self.downloaded)
402                 self.f.write(data[:(start - self.downloaded)])
403
404             # This merges consecutive overwrites if possible, which allows us to detect the
405             # case where the download can be stopped early because the remaining region
406             # to download has already been fully overwritten.
407             heapq.heappop(self.overwrites)
408             while len(self.overwrites) > 0:
409                 (start1, end1) = self.overwrites[0]
410                 if start1 > end:
411                     break
412                 end = end1
413                 heapq.heappop(self.overwrites)
414
415             if end >= next_downloaded:
416                 # This overwrite extends past the downloaded data, so there is no
417                 # more data to consider on this call.
418                 heapq.heappush(self.overwrites, (next_downloaded, end))
419                 self._update_downloaded(next_downloaded)
420                 return
421             elif end >= self.downloaded:
422                 data = data[(end - self.downloaded):]
423                 self._update_downloaded(end)
424
425         self.f.seek(self.downloaded)
426         self.f.write(data)
427         self._update_downloaded(next_downloaded)
428
429     def _update_downloaded(self, new_downloaded):
430         self.downloaded = new_downloaded
431         milestone = new_downloaded
432         if len(self.overwrites) > 0:
433             (start, end) = self.overwrites[0]
434             if start <= new_downloaded and end > milestone:
435                 milestone = end
436
437         while len(self.milestones) > 0:
438             (next, d) = self.milestones[0]
439             if next > milestone:
440                 return
441             if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
442             heapq.heappop(self.milestones)
443             eventually_callback(d)(None)
444
445         if milestone >= self.download_size:
446             self.finish()
447
448     def overwrite(self, offset, data):
449         if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
450         if offset > self.download_size and offset > self.current_size:
451             # Normally writing at an offset beyond the current end-of-file
452             # would leave a hole that appears filled with zeroes. However, an
453             # EncryptedTemporaryFile doesn't behave like that (if there is a
454             # hole in the file on disk, the zeroes that are read back will be
455             # XORed with the keystream). So we must explicitly write zeroes in
456             # the gap between the current EOF and the offset.
457
458             self.f.seek(self.current_size)
459             self.f.write("\x00" * (offset - self.current_size))            
460         else:
461             self.f.seek(offset)
462         self.f.write(data)
463         end = offset + len(data)
464         self.current_size = max(self.current_size, end)
465         if end > self.downloaded:
466             heapq.heappush(self.overwrites, (offset, end))
467
468     def read(self, offset, length):
469         """When the data has been read, callback the Deferred that we return with this data.
470         Otherwise errback the Deferred that we return.
471         The caller must perform no more overwrites until the Deferred has fired."""
472
473         if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
474         if offset >= self.current_size:
475             def _eof(): raise EOFError("read past end of file")
476             return defer.execute(_eof)
477
478         if offset + length > self.current_size:
479             length = self.current_size - offset
480             if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
481
482         needed = min(offset + length, self.download_size)
483         d = self.when_reached(needed)
484         def _reached(ign):
485             # It is not necessarily the case that self.downloaded >= needed, because
486             # the file might have been truncated (thus truncating the download) and
487             # then extended.
488
489             assert self.current_size >= offset + length, (self.current_size, offset, length)
490             if noisy: self.log("self.f = %r" % (self.f,), level=NOISY)
491             self.f.seek(offset)
492             return self.f.read(length)
493         d.addCallback(_reached)
494         return d
495
496     def when_reached(self, index):
497         if noisy: self.log(".when_reached(%r)" % (index,), level=NOISY)
498         if index <= self.downloaded:  # already reached
499             if noisy: self.log("already reached %r" % (index,), level=NOISY)
500             return defer.succeed(None)
501         d = defer.Deferred()
502         def _reached(ign):
503             if noisy: self.log("reached %r" % (index,), level=NOISY)
504             return ign
505         d.addCallback(_reached)
506         heapq.heappush(self.milestones, (index, d))
507         return d
508
509     def when_done(self):
510         return self.done
511
512     def finish(self):
513         while len(self.milestones) > 0:
514             (next, d) = self.milestones[0]
515             if noisy: self.log("MILESTONE FINISH %r %r" % (next, d), level=NOISY)
516             heapq.heappop(self.milestones)
517             # The callback means that the milestone has been reached if
518             # it is ever going to be. Note that the file may have been
519             # truncated to before the milestone.
520             eventually_callback(d)(None)
521
522         # FIXME: causes spurious failures
523         #self.unregisterProducer()
524
525     def close(self):
526         self.is_closed = True
527         self.finish()
528         self.f.close()
529
530     def unregisterProducer(self):
531         if self.producer:
532             self.producer.stopProducing()
533             self.producer = None
534
535
536 SIZE_THRESHOLD = 1000
537
538
539 class ShortReadOnlySFTPFile(PrefixingLogMixin):
540     implements(ISFTPFile)
541     """I represent a file handle to a particular file on an SFTP connection.
542     I am used only for short immutable files opened in read-only mode.
543     The file contents are downloaded to memory when I am created."""
544
545     def __init__(self, filenode, metadata):
546         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
547         if noisy: self.log(".__init__(%r, %r)" % (filenode, metadata), level=NOISY)
548
549         assert IFileNode.providedBy(filenode), filenode
550         self.filenode = filenode
551         self.metadata = metadata
552         self.async = download_to_data(filenode)
553         self.closed = False
554
555     def readChunk(self, offset, length):
556         request = ".readChunk(%r, %r)" % (offset, length)
557         self.log(request, level=OPERATIONAL)
558
559         if self.closed:
560             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
561             return defer.execute(_closed)
562
563         d = defer.Deferred()
564         def _read(data):
565             if noisy: self.log("_read(%r) in readChunk(%r, %r)" % (data, offset, length), level=NOISY)
566
567             # "In response to this request, the server will read as many bytes as it
568             #  can from the file (up to 'len'), and return them in a SSH_FXP_DATA
569             #  message.  If an error occurs or EOF is encountered before reading any
570             #  data, the server will respond with SSH_FXP_STATUS.  For normal disk
571             #  files, it is guaranteed that this will read the specified number of
572             #  bytes, or up to end of file."
573             #
574             # i.e. we respond with an EOF error iff offset is already at EOF.
575
576             if offset >= len(data):
577                 eventually_errback(d)(SFTPError(FX_EOF, "read at or past end of file"))
578             else:
579                 eventually_callback(d)(data[offset:min(offset+length, len(data))])
580             return data
581         self.async.addCallbacks(_read, eventually_errback(d))
582         d.addBoth(_convert_error, request)
583         return d
584
585     def writeChunk(self, offset, data):
586         self.log(".writeChunk(%r, <data of length %r>) denied" % (offset, len(data)), level=OPERATIONAL)
587
588         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
589         return defer.execute(_denied)
590
591     def close(self):
592         self.log(".close()", level=OPERATIONAL)
593
594         self.closed = True
595         return defer.succeed(None)
596
597     def getAttrs(self):
598         request = ".getAttrs()"
599         self.log(request, level=OPERATIONAL)
600
601         if self.closed:
602             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
603             return defer.execute(_closed)
604
605         d = defer.execute(_populate_attrs, self.filenode, self.metadata)
606         d.addBoth(_convert_error, request)
607         return d
608
609     def setAttrs(self, attrs):
610         self.log(".setAttrs(%r) denied" % (attrs,), level=OPERATIONAL)
611         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
612         return defer.execute(_denied)
613
614
615 class GeneralSFTPFile(PrefixingLogMixin):
616     implements(ISFTPFile)
617     """I represent a file handle to a particular file on an SFTP connection.
618     I wrap an instance of OverwriteableFileConsumer, which is responsible for
619     storing the file contents. In order to allow write requests to be satisfied
620     immediately, there is effectively a FIFO queue between requests made to this
621     file handle, and requests to my OverwriteableFileConsumer. This queue is
622     implemented by the callback chain of self.async."""
623
624     def __init__(self, close_notify, check_abort, flags, convergence, parent=None, childname=None, filenode=None, metadata=None):
625         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
626         if noisy: self.log(".__init__(%r, %r, %r, <convergence censored>, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
627                            (close_notify, check_abort, flags, parent, childname, filenode, metadata), level=NOISY)
628
629         self.close_notify = close_notify
630         self.check_abort = check_abort
631         self.flags = flags
632         self.convergence = convergence
633         self.parent = parent
634         self.childname = childname
635         self.filenode = filenode
636         self.metadata = metadata
637         self.async = defer.succeed(None)
638         # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
639         self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
640         self.closed = False
641         
642         # self.consumer should only be relied on in callbacks for self.async, since it might
643         # not be set before then.
644         self.consumer = None
645         tempfile_maker = EncryptedTemporaryFile
646
647         if (flags & FXF_TRUNC) or not filenode:
648             # We're either truncating or creating the file, so we don't need the old contents.
649             self.consumer = OverwriteableFileConsumer(self.check_abort, 0, tempfile_maker)
650             self.consumer.finish()
651         else:
652             assert IFileNode.providedBy(filenode), filenode
653
654             # TODO: use download interface described in #993 when implemented.
655             if filenode.is_mutable():
656                 self.async.addCallback(lambda ign: filenode.download_best_version())
657                 def _downloaded(data):
658                     self.consumer = OverwriteableFileConsumer(self.check_abort, len(data), tempfile_maker)
659                     self.consumer.write(data)
660                     self.consumer.finish()
661                     return None
662                 self.async.addCallback(_downloaded)
663             else:
664                 download_size = filenode.get_size()
665                 assert download_size is not None, "download_size is None"
666                 self.consumer = OverwriteableFileConsumer(self.check_abort, download_size, tempfile_maker)
667                 def _read(ign):
668                     if noisy: self.log("_read immutable", level=NOISY)
669                     filenode.read(self.consumer, 0, None)
670                 self.async.addCallback(_read)
671
672         if noisy: self.log("__init__ done", level=NOISY)
673
674     def rename(self, new_parent, new_childname):
675         self.log(".rename(%r, %r)" % (new_parent, new_childname), level=OPERATIONAL)
676
677         self.parent = new_parent
678         self.childname = new_childname
679
680     def readChunk(self, offset, length):
681         request = ".readChunk(%r, %r)" % (offset, length)
682         self.log(request, level=OPERATIONAL)
683
684         if not (self.flags & FXF_READ):
685             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for reading")
686             return defer.execute(_denied)
687
688         if self.closed:
689             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
690             return defer.execute(_closed)
691
692         d = defer.Deferred()
693         def _read(ign):
694             if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
695             d2 = self.consumer.read(offset, length)
696             d2.addErrback(_convert_error, request)
697             d2.addCallbacks(eventually_callback(d), eventually_errback(d))
698             # It is correct to drop d2 here.
699             return None
700         self.async.addCallbacks(_read, eventually_errback(d))
701         d.addBoth(_convert_error, request)
702         return d
703
704     def writeChunk(self, offset, data):
705         self.log(".writeChunk(%r, <data of length %r>)" % (offset, len(data)), level=OPERATIONAL)
706
707         if not (self.flags & FXF_WRITE):
708             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
709             return defer.execute(_denied)
710
711         if self.closed:
712             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
713             return defer.execute(_closed)
714
715         self.has_changed = True
716
717         # Note that we return without waiting for the write to occur. Reads and
718         # close wait for prior writes, and will fail if any prior operation failed.
719         # This is ok because SFTP makes no guarantee that the request completes
720         # before the write. In fact it explicitly allows write errors to be delayed
721         # until close:
722         #   "One should note that on some server platforms even a close can fail.
723         #    This can happen e.g. if the server operating system caches writes,
724         #    and an error occurs while flushing cached writes during the close."
725
726         def _write(ign):
727             if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
728                                (offset, len(data), self.consumer.get_current_size()), level=NOISY)
729             # FXF_APPEND means that we should always write at the current end of file.
730             write_offset = offset
731             if self.flags & FXF_APPEND:
732                 write_offset = self.consumer.get_current_size()
733
734             self.consumer.overwrite(write_offset, data)
735             if noisy: self.log("overwrite done", level=NOISY)
736             return None
737         self.async.addCallback(_write)
738         # don't addErrback to self.async, just allow subsequent async ops to fail.
739         return defer.succeed(None)
740
741     def close(self):
742         request = ".close()"
743         self.log(request, level=OPERATIONAL)
744
745         if self.closed:
746             return defer.succeed(None)
747
748         # This means that close has been called, not that the close has succeeded.
749         self.closed = True
750
751         if not (self.flags & (FXF_WRITE | FXF_CREAT)):
752             return defer.execute(self.consumer.close)
753
754         def _close(ign):
755             d2 = defer.succeed(None)
756             if self.has_changed:
757                 d2.addCallback(lambda ign: self.consumer.when_done())
758                 if self.filenode and self.filenode.is_mutable():
759                     d2.addCallback(lambda ign: self.consumer.get_current_size())
760                     d2.addCallback(lambda size: self.consumer.read(0, size))
761                     d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
762                 else:
763                     def _add_file(ign):
764                         self.log("_add_file childname=%r" % (self.childname,), level=OPERATIONAL)
765                         u = FileHandle(self.consumer.get_file(), self.convergence)
766                         return self.parent.add_file(self.childname, u)
767                     d2.addCallback(_add_file)
768
769             d2.addCallback(lambda ign: self.consumer.close())
770             return d2
771         self.async.addCallback(_close)
772
773         d = defer.Deferred()
774         self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
775
776         def _closed(res):
777             self.close_notify(self.parent, self.childname, self)
778             return res
779         d.addBoth(_closed)
780         d.addBoth(_convert_error, request)
781         return d
782
783     def getAttrs(self):
784         request = ".getAttrs()"
785         self.log(request, level=OPERATIONAL)
786
787         if self.closed:
788             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
789             return defer.execute(_closed)
790
791         # Optimization for read-only handles, when we already know the metadata.
792         if not(self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
793             return defer.succeed(_populate_attrs(self.filenode, self.metadata))
794
795         d = defer.Deferred()
796         def _get(ign):
797             # self.filenode might be None, but that's ok.
798             attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
799             eventually_callback(d)(attrs)
800             return None
801         self.async.addCallbacks(_get, eventually_errback(d))
802         d.addBoth(_convert_error, request)
803         return d
804
805     def setAttrs(self, attrs):
806         request = ".setAttrs(attrs) %r" % (attrs,)
807         self.log(request, level=OPERATIONAL)
808
809         if not (self.flags & FXF_WRITE):
810             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
811             return defer.execute(_denied)
812
813         if self.closed:
814             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
815             return defer.execute(_closed)
816
817         if not "size" in attrs:
818             return defer.succeed(None)
819
820         size = attrs["size"]
821         if not isinstance(size, (int, long)) or size < 0:
822             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
823             return defer.execute(_bad)
824
825         d = defer.Deferred()
826         def _resize(ign):
827             self.consumer.set_current_size(size)
828             eventually_callback(d)(None)
829             return None
830         self.async.addCallbacks(_resize, eventually_errback(d))
831         d.addBoth(_convert_error, request)
832         return d
833
834
835 class StoppableList:
836     def __init__(self, items):
837         self.items = items
838     def __iter__(self):
839         for i in self.items:
840             yield i
841     def close(self):
842         pass
843
844
845 class Reason:
846     def __init__(self, value):
847         self.value = value
848
849
850 # For each immutable file that has been opened with write flags
851 # (FXF_WRITE and/or FXF_CREAT) and is still open, this maps from
852 # parent_write_uri + "/" + childname_utf8, to (list_of_ISFTPFile, open_time_utc).
853 # Updates to this dict are single-threaded.
854
855 all_open_files = {}
856
857 class SFTPUserHandler(ConchUser, PrefixingLogMixin):
858     implements(ISFTPServer)
859     def __init__(self, client, rootnode, username):
860         ConchUser.__init__(self)
861         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
862         if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
863
864         self.channelLookup["session"] = session.SSHSession
865         self.subsystemLookup["sftp"] = FileTransferServer
866
867         self._client = client
868         self._root = rootnode
869         self._username = username
870         self._convergence = client.convergence
871         self._logged_out = False
872         self._open_files = {}  # files created by this user handler and still open
873
874     def gotVersion(self, otherVersion, extData):
875         self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
876
877         # advertise the same extensions as the OpenSSH SFTP server
878         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
879         return {'extposix-rename@openssh.com': '1',
880                 'statvfs@openssh.com': '2',
881                 'fstatvfs@openssh.com': '2',
882                }
883
884     def _add_open_files(self, direntry, files_to_add):
885         if direntry:
886             if direntry in self._open_files:
887                 self._open_files[direntry] += files_to_add
888             else:
889                 self._open_files[direntry] = files_to_add
890
891             if direntry in all_open_files:
892                 (old_files, opentime) = all_open_files[direntry]
893                 all_open_files[direntry] = (old_files + files_to_add, opentime)
894             else:
895                 all_open_files[direntry] = (files_to_add, time())
896
897     def _remove_open_files(self, direntry, files_to_remove):
898         if direntry and not self._logged_out:
899             assert direntry in self._open_files, (direntry, self._open_files)
900             assert direntry in all_open_files, (direntry, all_open_files)
901
902             old_files = self._open_files[direntry]
903             new_files = [f for f in old_files if f not in files_to_remove]
904             if len(new_files) > 0:
905                 self._open_files[direntry] = new_files
906             else:
907                 del self._open_files[direntry]
908
909             (all_old_files, opentime) = all_open_files[direntry]
910             all_new_files = [f for f in all_old_files if f not in files_to_remove]
911             if len(all_new_files) > 0:
912                 all_open_files[direntry] = (all_new_files, opentime)
913             else:
914                 del all_open_files[direntry]
915
916     def _rename_open_files(self, from_parent, from_childname, to_parent, to_childname):
917         """When an direntry is renamed, any open files for that direntry are also renamed.
918         Return True if there were any open files at from_direntry."""
919
920         from_direntry = self._direntry_for(from_parent, from_childname)
921         to_direntry = self._direntry_for(to_parent, to_childname)
922
923         if from_direntry in all_open_files:
924             (from_files, opentime) = all_open_files[from_direntry]
925             del self._open_files[from_direntry]
926             del all_open_files[from_direntry]
927             for file in from_files:
928                 file.rename(to_parent, to_childname)
929             self._add_open_files(to_direntry, from_files)
930             return True
931         else:
932             return False
933
934     def _direntry_for(self, parent, childname):
935         if parent and childname:
936             rw_uri = parent.get_write_uri()
937             if rw_uri:
938                 return rw_uri + "/" + childname.encode('utf-8')
939
940         return None
941
942     def logout(self):
943         if not self._logged_out:
944             self._logged_out = True
945             for (direntry, files_at_direntry) in enumerate(self._open_files):
946                 self._remove_open_files(direntry, files_at_direntry)
947
948     def _check_abort(self):
949         return self._logged_out
950
951     def _close_notify(self, parent, childname, f):
952         self._remove_open_files(self._direntry_for(parent, childname), [f])
953
954     def _make_file(self, flags, parent=None, childname=None, filenode=None, metadata=None):
955         if noisy: self.log("._make_file(%r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r" %
956                            (flags, _repr_flags(flags), parent, childname, filenode, metadata), level=NOISY)
957
958         assert metadata is None or 'readonly' in metadata, metadata
959         writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
960
961         if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
962             return ShortReadOnlySFTPFile(filenode, metadata)
963         else:
964             direntry = None
965             if writing:
966                 direntry = self._direntry_for(parent, childname)
967
968             file = GeneralSFTPFile(self._close_notify, self._check_abort, flags, self._convergence,
969                                    parent=parent, childname=childname, filenode=filenode, metadata=metadata)
970             self._add_open_files(direntry, [file])
971             return file
972
973     def openFile(self, pathstring, flags, attrs):
974         request = ".openFile(%r, %r = %r, %r)" % (pathstring, flags, _repr_flags(flags), attrs)
975         self.log(request, level=OPERATIONAL)
976
977         # This is used for both reading and writing.
978         # First exclude invalid combinations of flags.
979
980         # /usr/bin/sftp 'get' gives us FXF_READ, while 'put' on a new file
981         # gives FXF_WRITE | FXF_CREAT | FXF_TRUNC. I'm guessing that 'put' on an
982         # existing file gives the same.
983
984         if not (flags & (FXF_READ | FXF_WRITE)):
985             raise SFTPError(FX_BAD_MESSAGE,
986                             "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
987
988         if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
989             raise SFTPError(FX_BAD_MESSAGE,
990                             "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
991
992         path = self._path_from_string(pathstring)
993         if not path:
994             raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
995
996         # The combination of flags is potentially valid. Now there are two major cases:
997         #
998         #  1. The path is specified as /uri/FILECAP, with no parent directory.
999         #     If the FILECAP is mutable and writeable, then we can open it in write-only
1000         #     or read/write mode (non-exclusively), otherwise we can only open it in
1001         #     read-only mode. The open should succeed immediately as long as FILECAP is
1002         #     a valid known filecap that grants the required permission.
1003         #
1004         #  2. The path is specified relative to a parent. We find the parent dirnode and
1005         #     get the child's URI and metadata if it exists. There are four subcases:
1006         #       a. the child does not exist: FXF_CREAT must be set, and we must be able
1007         #          to write to the parent directory.
1008         #       b. the child exists but is not a valid known filecap: fail
1009         #       c. the child is mutable: if we are trying to open it write-only or
1010         #          read/write, then we must be able to write to the file.
1011         #       d. the child is immutable: if we are trying to open it write-only or
1012         #          read/write, then we must be able to write to the parent directory.
1013         #
1014         # To reduce latency, open succeeds as soon as these conditions are met, even
1015         # though there might be a failure in downloading the existing file or uploading
1016         # a new one.
1017         #
1018         # Note that the permission checks below are for more precise error reporting on
1019         # the open call; later operations would fail even if we did not make these checks.
1020
1021         d = self._get_root(path)
1022         def _got_root( (root, path) ):
1023             if root.is_unknown():
1024                 raise SFTPError(FX_PERMISSION_DENIED,
1025                                 "cannot open an unknown cap (or child of an unknown directory). "
1026                                 "Upgrading the gateway to a later Tahoe-LAFS version may help")
1027             if not path:
1028                 # case 1
1029                 if noisy: self.log("case 1: root = %r, path[:-1] = %r" % (root, path[:-1]), level=NOISY)
1030                 if not IFileNode.providedBy(root):
1031                     raise SFTPError(FX_PERMISSION_DENIED,
1032                                     "cannot open a directory cap")
1033                 if (flags & FXF_WRITE) and root.is_readonly():
1034                     raise SFTPError(FX_PERMISSION_DENIED,
1035                                     "cannot write to a non-writeable filecap without a parent directory")
1036                 if flags & FXF_EXCL:
1037                     raise SFTPError(FX_FAILURE,
1038                                     "cannot create a file exclusively when it already exists")
1039
1040                 # The file does not need to be added to all_open_files, because it is not
1041                 # associated with a directory entry that needs to be updated.
1042
1043                 return self._make_file(flags, filenode=root)
1044             else:
1045                 # case 2
1046                 childname = path[-1]
1047                 if noisy: self.log("case 2: root = %r, childname = %r, path[:-1] = %r" %
1048                                    (root, childname, path[:-1]), level=NOISY)
1049                 d2 = root.get_child_at_path(path[:-1])
1050                 def _got_parent(parent):
1051                     if noisy: self.log("_got_parent(%r)" % (parent,), level=NOISY)
1052                     if parent.is_unknown():
1053                         raise SFTPError(FX_PERMISSION_DENIED,
1054                                         "cannot open an unknown cap (or child of an unknown directory). "
1055                                         "Upgrading the gateway to a later Tahoe-LAFS version may help")
1056
1057                     parent_readonly = parent.is_readonly()
1058                     d3 = defer.succeed(None)
1059                     if flags & FXF_EXCL:
1060                         # FXF_EXCL means that the link to the file (not the file itself) must
1061                         # be created atomically wrt updates by this storage client.
1062                         # That is, we need to create the link before returning success to the
1063                         # SFTP open request (and not just on close, as would normally be the
1064                         # case). We make the link initially point to a zero-length LIT file,
1065                         # which is consistent with what might happen on a POSIX filesystem.
1066
1067                         if parent_readonly:
1068                             raise SFTPError(FX_FAILURE,
1069                                             "cannot create a file exclusively when the parent directory is read-only")
1070
1071                         # 'overwrite=False' ensures failure if the link already exists.
1072                         # FIXME: should use a single call to set_uri and return (child, metadata) (#1035)
1073
1074                         zero_length_lit = "URI:LIT:"
1075                         if noisy: self.log("%r.set_uri(%r, None, readcap=%r, overwrite=False)" %
1076                                            (parent, zero_length_lit, childname), level=NOISY)
1077                         d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit, overwrite=False))
1078                         def _seturi_done(child):
1079                             if noisy: self.log("%r.get_metadata_for(%r)" % (parent, childname), level=NOISY)
1080                             d4 = parent.get_metadata_for(childname)
1081                             d4.addCallback(lambda metadata: (child, metadata))
1082                             return d4
1083                         d3.addCallback(_seturi_done)
1084                     else:
1085                         if noisy: self.log("%r.get_child_and_metadata(%r)" % (parent, childname), level=NOISY)
1086                         d3.addCallback(lambda ign: parent.get_child_and_metadata(childname))
1087
1088                     def _got_child( (filenode, metadata) ):
1089                         if noisy: self.log("_got_child( (%r, %r) )" % (filenode, metadata), level=NOISY)
1090
1091                         if filenode.is_unknown():
1092                             raise SFTPError(FX_PERMISSION_DENIED,
1093                                             "cannot open an unknown cap. Upgrading the gateway "
1094                                             "to a later Tahoe-LAFS version may help")
1095                         if not IFileNode.providedBy(filenode):
1096                             raise SFTPError(FX_PERMISSION_DENIED,
1097                                             "cannot open a directory as if it were a file")
1098                         if (flags & FXF_WRITE) and filenode.is_mutable() and filenode.is_readonly():
1099                             raise SFTPError(FX_PERMISSION_DENIED,
1100                                             "cannot open a read-only mutable file for writing")
1101                         if (flags & FXF_WRITE) and parent_readonly:
1102                             raise SFTPError(FX_PERMISSION_DENIED,
1103                                             "cannot open a file for writing when the parent directory is read-only")
1104
1105                         metadata['readonly'] = _is_readonly(parent_readonly, filenode)
1106                         return self._make_file(flags, parent=parent, childname=childname, filenode=filenode, metadata=metadata)
1107                     def _no_child(f):
1108                         if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
1109                         f.trap(NoSuchChildError)
1110
1111                         if not (flags & FXF_CREAT):
1112                             raise SFTPError(FX_NO_SUCH_FILE,
1113                                             "the file does not exist, and was not opened with the creation (CREAT) flag")
1114                         if parent_readonly:
1115                             raise SFTPError(FX_PERMISSION_DENIED,
1116                                             "cannot create a file when the parent directory is read-only")
1117
1118                         return self._make_file(flags, parent=parent, childname=childname)
1119                     d3.addCallbacks(_got_child, _no_child)
1120                     return d3
1121
1122                 d2.addCallback(_got_parent)
1123                 return d2
1124
1125         d.addCallback(_got_root)
1126         d.addBoth(_convert_error, request)
1127         return d
1128
1129     def renameFile(self, from_pathstring, to_pathstring, overwrite=False):
1130         request = ".renameFile(%r, %r)" % (from_pathstring, to_pathstring)
1131         self.log(request, level=OPERATIONAL)
1132
1133         from_path = self._path_from_string(from_pathstring)
1134         to_path = self._path_from_string(to_pathstring)
1135
1136         # the target directory must already exist
1137         d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
1138                                         self._get_parent_or_node(to_path)])
1139         def _got( (from_pair, to_pair) ):
1140             if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r)" %
1141                                (from_pair, to_pair, from_pathstring, to_pathstring), level=NOISY)
1142             (from_parent, from_childname) = from_pair
1143             (to_parent, to_childname) = to_pair
1144             
1145             if from_childname is None:
1146                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename a source object specified by URI")
1147             if to_childname is None:
1148                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename to a destination specified by URI")
1149
1150             # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
1151             # "It is an error if there already exists a file with the name specified
1152             #  by newpath."
1153             # For the standard SSH_FXP_RENAME operation, overwrite=False.
1154             # We also support the extposix-rename@openssh.com extension, which uses overwrite=True.
1155
1156             # FIXME: use move_child_to_path to avoid possible data loss due to #943
1157             #d2 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
1158
1159             d2 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
1160             def _check(err):
1161                 if noisy: self.log("_check(%r) in .renameFile(%r, %r)" %
1162                                    (err, from_pathstring, to_pathstring), level=NOISY)
1163
1164                 if not isinstance(err, Failure) or err.check(NoSuchChildError):
1165                     # If there are open files to be written at the 'from' direntry, then ensure
1166                     # they will now be written at the 'to' direntry instead.
1167                     if noisy: self.log("checking open files:\nself._open_files = %r\nall_open_files = %r" %
1168                                        (self._open_files, all_open_files), level=NOISY)
1169                     if self._rename_open_files(from_parent, from_childname, to_parent, to_childname):
1170                         # suppress the NoSuchChildError if any open files were renamed
1171                         if noisy: self.log("after renaming:\nself._open_files = %r\nall_open_files = %r" %
1172                                            (self._open_files, all_open_files), level=NOISY)
1173                         return None
1174                 elif err.check(ExistingChildError):
1175                     # OpenSSH SFTP server returns FX_PERMISSION_DENIED
1176                     raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_pathstring)
1177
1178                 return err
1179             d2.addBoth(_check)
1180             return d2
1181         d.addCallback(_got)
1182         d.addBoth(_convert_error, request)
1183         return d
1184
1185     def makeDirectory(self, pathstring, attrs):
1186         request = ".makeDirectory(%r, %r)" % (pathstring, attrs)
1187         self.log(request, level=OPERATIONAL)
1188
1189         path = self._path_from_string(pathstring)
1190         metadata = self._attrs_to_metadata(attrs)
1191         d = self._get_root(path)
1192         d.addCallback(lambda (root, path):
1193                       self._get_or_create_directories(root, path, metadata))
1194         d.addBoth(_convert_error, request)
1195         return d
1196
1197     def _get_or_create_directories(self, node, path, metadata):
1198         if not IDirectoryNode.providedBy(node):
1199             # TODO: provide the name of the blocking file in the error message.
1200             def _blocked(): raise SFTPError(FX_FAILURE, "cannot create directory because there "
1201                                                         "is a file in the way") # close enough
1202             return defer.execute(_blocked)
1203
1204         if not path:
1205             return defer.succeed(node)
1206         d = node.get(path[0])
1207         def _maybe_create(f):
1208             f.trap(NoSuchChildError)
1209             return node.create_subdirectory(path[0])
1210         d.addErrback(_maybe_create)
1211         d.addCallback(self._get_or_create_directories, path[1:], metadata)
1212         return d
1213
1214     def removeFile(self, pathstring):
1215         request = ".removeFile(%r)" % (pathstring,)
1216         self.log(request, level=OPERATIONAL)
1217
1218         path = self._path_from_string(pathstring)
1219         d = self._remove_object(path, must_be_file=True)
1220         d.addBoth(_convert_error, request)
1221         return d
1222
1223     def removeDirectory(self, pathstring):
1224         request = ".removeDirectory(%r)" % (pathstring,)
1225         self.log(request, level=OPERATIONAL)
1226
1227         path = self._path_from_string(pathstring)
1228         d = self._remove_object(path, must_be_directory=True)
1229         d.addBoth(_convert_error, request)
1230         return d
1231
1232     def _remove_object(self, path, must_be_directory=False, must_be_file=False):
1233         d = defer.maybeDeferred(self._get_parent_or_node, path)
1234         def _got_parent( (parent, childname) ):
1235             # FIXME (minor): there is a race condition between the 'get' and 'delete',
1236             # so it is possible that the must_be_directory or must_be_file restrictions
1237             # might not be enforced correctly if the type has just changed.
1238
1239             if childname is None:
1240                 raise SFTPError(FX_NO_SUCH_FILE, "cannot delete an object specified by URI")
1241
1242             d2 = parent.get(childname)
1243             def _got_child(child):
1244                 # Unknown children can be removed by either removeFile or removeDirectory.
1245                 if must_be_directory and IFileNode.providedBy(child):
1246                     raise SFTPError(FX_PERMISSION_DENIED, "rmdir called on a file")
1247                 if must_be_file and IDirectoryNode.providedBy(child):
1248                     raise SFTPError(FX_PERMISSION_DENIED, "rmfile called on a directory")
1249                 return parent.delete(childname)
1250             d2.addCallback(_got_child)
1251             return d2
1252         d.addCallback(_got_parent)
1253         return d
1254
1255     def openDirectory(self, pathstring):
1256         request = ".openDirectory(%r)" % (pathstring,)
1257         self.log(request, level=OPERATIONAL)
1258
1259         path = self._path_from_string(pathstring)
1260         d = self._get_parent_or_node(path)
1261         def _got_parent_or_node( (parent_or_node, childname) ):
1262             if noisy: self.log("_got_parent_or_node( (%r, %r) ) in openDirectory(%r)" %
1263                                (parent_or_node, childname, pathstring), level=NOISY)
1264             if childname is None:
1265                 return parent_or_node
1266             else:
1267                 return parent_or_node.get(childname)
1268         d.addCallback(_got_parent_or_node)
1269         def _list(dirnode):
1270             if dirnode.is_unknown():
1271                 raise SFTPError(FX_PERMISSION_DENIED,
1272                                 "cannot list an unknown cap as a directory. Upgrading the gateway "
1273                                 "to a later Tahoe-LAFS version may help")
1274             if not IDirectoryNode.providedBy(dirnode):
1275                 raise SFTPError(FX_PERMISSION_DENIED,
1276                                 "cannot list a file as if it were a directory")
1277
1278             d2 = dirnode.list()
1279             def _render(children):
1280                 parent_readonly = dirnode.is_readonly()
1281                 results = []
1282                 for filename, (child, metadata) in children.iteritems():
1283                     # The file size may be cached or absent.
1284                     metadata['readonly'] = _is_readonly(parent_readonly, child)
1285                     attrs = _populate_attrs(child, metadata)
1286                     filename_utf8 = filename.encode('utf-8')
1287                     longname = _lsLine(filename_utf8, attrs)
1288                     results.append( (filename_utf8, longname, attrs) )
1289                 return StoppableList(results)
1290             d2.addCallback(_render)
1291             return d2
1292         d.addCallback(_list)
1293         d.addBoth(_convert_error, request)
1294         return d
1295
1296     def getAttrs(self, pathstring, followLinks):
1297         request = ".getAttrs(%r, followLinks=%r)" % (pathstring, followLinks)
1298         self.log(request, level=OPERATIONAL)
1299
1300         # When asked about a specific file, report its current size.
1301         # TODO: the modification time for a mutable file should be
1302         # reported as the update time of the best version. But that
1303         # information isn't currently stored in mutable shares, I think.
1304
1305         path = self._path_from_string(pathstring)
1306         d = self._get_parent_or_node(path)
1307         def _got_parent_or_node( (parent_or_node, childname) ):
1308             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1309             if childname is None:
1310                 node = parent_or_node
1311                 d2 = node.get_current_size()
1312                 d2.addCallback(lambda size:
1313                                _populate_attrs(node, {'readonly': node.is_unknown() or node.is_readonly()}, size=size))
1314                 return d2
1315             else:
1316                 parent = parent_or_node
1317                 d2 = parent.get_child_and_metadata_at_path([childname])
1318                 def _got( (child, metadata) ):
1319                     if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
1320                     assert IDirectoryNode.providedBy(parent), parent
1321                     metadata['readonly'] = _is_readonly(parent.is_readonly(), child)
1322                     d3 = child.get_current_size()
1323                     d3.addCallback(lambda size: _populate_attrs(child, metadata, size=size))
1324                     return d3
1325                 def _nosuch(err):
1326                     if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
1327                     err.trap(NoSuchChildError)
1328                     direntry = self._direntry_for(parent, childname)
1329                     if noisy: self.log("checking open files:\nself._open_files = %r\nall_open_files = %r\ndirentry=%r" %
1330                                        (self._open_files, all_open_files, direntry), level=NOISY)
1331                     if direntry in all_open_files:
1332                         (files, opentime) = all_open_files[direntry]
1333                         # A file that has been opened for writing necessarily has permissions rw-rw-rw-.
1334                         return {'permissions': S_IFREG | 0666,
1335                                 'size': 0,
1336                                 'createtime': opentime,
1337                                 'ctime': opentime,
1338                                 'mtime': opentime,
1339                                 'atime': opentime,
1340                                }
1341                     return err
1342                 d2.addCallbacks(_got, _nosuch)
1343                 return d2
1344         d.addCallback(_got_parent_or_node)
1345         d.addBoth(_convert_error, request)
1346         return d
1347
1348     def setAttrs(self, pathstring, attrs):
1349         self.log(".setAttrs(%r, %r)" % (pathstring, attrs), level=OPERATIONAL)
1350
1351         if "size" in attrs:
1352             # this would require us to download and re-upload the truncated/extended
1353             # file contents
1354             def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute")
1355             return defer.execute(_unsupported)
1356         return defer.succeed(None)
1357
1358     def readLink(self, pathstring):
1359         self.log(".readLink(%r)" % (pathstring,), level=OPERATIONAL)
1360
1361         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "readLink")
1362         return defer.execute(_unsupported)
1363
1364     def makeLink(self, linkPathstring, targetPathstring):
1365         self.log(".makeLink(%r, %r)" % (linkPathstring, targetPathstring), level=OPERATIONAL)
1366
1367         # If this is implemented, note the reversal of arguments described in point 7 of
1368         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>.
1369
1370         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "makeLink")
1371         return defer.execute(_unsupported)
1372
1373     def extendedRequest(self, extensionName, extensionData):
1374         self.log(".extendedRequest(%r, <data of length %r>)" % (extensionName, len(extensionData)), level=OPERATIONAL)
1375
1376         # We implement the three main OpenSSH SFTP extensions; see
1377         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1378
1379         if extensionName == 'extposix-rename@openssh.com':
1380             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "could not parse extposix-rename@openssh.com request")
1381
1382             (fromPathLen,) = struct.unpack('>L', extensionData[0:4])
1383             if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
1384
1385             (toPathLen,) = struct.unpack('>L', extensionData[(4 + fromPathLen):(8 + fromPathLen)])
1386             if 8 + fromPathLen + toPathLen != len(extensionData): return defer.execute(_bad)
1387
1388             fromPathstring = extensionData[4:(4 + fromPathLen)]
1389             toPathstring = extensionData[(8 + fromPathLen):]
1390             return self.renameFile(fromPathstring, toPathstring, overwrite=True)
1391
1392         if extensionName == 'statvfs@openssh.com' or extensionName == 'fstatvfs@openssh.com':
1393             return defer.succeed(struct.pack('>11Q',
1394                 1024,         # uint64  f_bsize     /* file system block size */
1395                 1024,         # uint64  f_frsize    /* fundamental fs block size */
1396                 628318530,    # uint64  f_blocks    /* number of blocks (unit f_frsize) */
1397                 314159265,    # uint64  f_bfree     /* free blocks in file system */
1398                 314159265,    # uint64  f_bavail    /* free blocks for non-root */
1399                 200000000,    # uint64  f_files     /* total file inodes */
1400                 100000000,    # uint64  f_ffree     /* free file inodes */
1401                 100000000,    # uint64  f_favail    /* free file inodes for non-root */
1402                 0x1AF5,       # uint64  f_fsid      /* file system id */
1403                 2,            # uint64  f_flag      /* bit mask = ST_NOSUID; not ST_RDONLY */
1404                 65535,        # uint64  f_namemax   /* maximum filename length */
1405                 ))
1406
1407         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "unsupported %r request <data of length %r>" %
1408                                                                (extensionName, len(extensionData)))
1409         return defer.execute(_unsupported)
1410
1411     def realPath(self, pathstring):
1412         self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
1413
1414         path_utf8 = [p.encode('utf-8') for p in self._path_from_string(pathstring)]
1415         return "/" + "/".join(path_utf8)
1416
1417     def _path_from_string(self, pathstring):
1418         if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
1419
1420         # The home directory is the root directory.
1421         pathstring = pathstring.strip("/")
1422         if pathstring == "" or pathstring == ".":
1423             path_utf8 = []
1424         else:
1425             path_utf8 = pathstring.split("/")
1426
1427         # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.2>
1428         # "Servers SHOULD interpret a path name component ".." as referring to
1429         #  the parent directory, and "." as referring to the current directory."
1430         path = []
1431         for p_utf8 in path_utf8:
1432             if p_utf8 == "..":
1433                 # ignore excess .. components at the root
1434                 if len(path) > 0:
1435                     path = path[:-1]
1436             elif p_utf8 != ".":
1437                 try:
1438                     p = p_utf8.decode('utf-8', 'strict')
1439                 except UnicodeError:
1440                     raise SFTPError(FX_NO_SUCH_FILE, "path could not be decoded as UTF-8")
1441                 path.append(p)
1442
1443         if noisy: self.log(" PATH %r" % (path,), level=NOISY)
1444         return path
1445
1446     def _get_root(self, path):
1447         # return Deferred (root, remaining_path)
1448         if path and path[0] == u"uri":
1449             d = defer.maybeDeferred(self._client.create_node_from_uri, path[1].encode('utf-8'))
1450             d.addCallback(lambda root: (root, path[2:]))
1451         else:
1452             d = defer.succeed((self._root, path))
1453         return d
1454
1455     def _get_parent_or_node(self, path):
1456         # return Deferred (parent, childname) or (node, None)
1457         d = self._get_root(path)
1458         def _got_root( (root, remaining_path) ):
1459             if not remaining_path:
1460                 return (root, None)
1461             else:
1462                 d2 = root.get_child_at_path(remaining_path[:-1])
1463                 d2.addCallback(lambda parent: (parent, remaining_path[-1]))
1464                 return d2
1465         d.addCallback(_got_root)
1466         return d
1467
1468     def _attrs_to_metadata(self, attrs):
1469         metadata = {}
1470
1471         for key in attrs:
1472             if key == "mtime" or key == "ctime" or key == "createtime":
1473                 metadata[key] = long(attrs[key])
1474             elif key.startswith("ext_"):
1475                 metadata[key] = str(attrs[key])
1476
1477         return metadata
1478
1479
1480 class SFTPUser(ConchUser, PrefixingLogMixin):
1481     implements(ISession)
1482     def __init__(self, check_abort, client, rootnode, username, convergence):
1483         ConchUser.__init__(self)
1484         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
1485
1486         self.channelLookup["session"] = session.SSHSession
1487         self.subsystemLookup["sftp"] = FileTransferServer
1488
1489         self.check_abort = check_abort
1490         self.client = client
1491         self.root = rootnode
1492         self.username = username
1493         self.convergence = convergence
1494
1495     def getPty(self, terminal, windowSize, attrs):
1496         self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
1497         raise NotImplementedError
1498
1499     def openShell(self, protocol):
1500         self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
1501         raise NotImplementedError
1502
1503     def execCommand(self, protocol, cmd):
1504         self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
1505         raise NotImplementedError
1506
1507     def windowChanged(self, newWindowSize):
1508         self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
1509
1510     def eofReceived():
1511         self.log(".eofReceived()", level=OPERATIONAL)
1512
1513     def closed(self):
1514         self.log(".closed()", level=OPERATIONAL)
1515
1516
1517 # if you have an SFTPUser, and you want something that provides ISFTPServer,
1518 # then you get SFTPHandler(user)
1519 components.registerAdapter(SFTPHandler, SFTPUser, ISFTPServer)
1520
1521 from auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
1522
1523 class Dispatcher:
1524     implements(portal.IRealm)
1525     def __init__(self, client):
1526         self._client = client
1527
1528     def requestAvatar(self, avatarID, mind, interface):
1529         assert interface == IConchUser, interface
1530         rootnode = self._client.create_node_from_uri(avatarID.rootcap)
1531         handler = SFTPUserHandler(self._client, rootnode, avatarID.username)
1532         return (interface, handler, handler.logout)
1533
1534
1535 class SFTPServer(service.MultiService):
1536     def __init__(self, client, accountfile, accounturl,
1537                  sftp_portstr, pubkey_file, privkey_file):
1538         service.MultiService.__init__(self)
1539
1540         r = Dispatcher(client)
1541         p = portal.Portal(r)
1542
1543         if accountfile:
1544             c = AccountFileChecker(self, accountfile)
1545             p.registerChecker(c)
1546         if accounturl:
1547             c = AccountURLChecker(self, accounturl)
1548             p.registerChecker(c)
1549         if not accountfile and not accounturl:
1550             # we could leave this anonymous, with just the /uri/CAP form
1551             raise NeedRootcapLookupScheme("must provide an account file or URL")
1552
1553         pubkey = keys.Key.fromFile(pubkey_file)
1554         privkey = keys.Key.fromFile(privkey_file)
1555         class SSHFactory(factory.SSHFactory):
1556             publicKeys = {pubkey.sshType(): pubkey}
1557             privateKeys = {privkey.sshType(): privkey}
1558             def getPrimes(self):
1559                 try:
1560                     # if present, this enables diffie-hellman-group-exchange
1561                     return primes.parseModuliFile("/etc/ssh/moduli")
1562                 except IOError:
1563                     return None
1564
1565         f = SSHFactory()
1566         f.portal = p
1567
1568         s = strports.service(sftp_portstr, f)
1569         s.setServiceParent(self)