]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/sftpd.py
59952a994744ebdc95f7307e07b8336d84b634c7
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / sftpd.py
1
2 import heapq, traceback, array, stat, struct
3 from types import NoneType
4 from stat import S_IFREG, S_IFDIR
5 from time import time, strftime, localtime
6
7 from zope.interface import implements
8 from twisted.python import components
9 from twisted.application import service, strports
10 from twisted.conch.ssh import factory, keys, session
11 from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
12      FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
13      FX_BAD_MESSAGE, FX_FAILURE, FX_OK
14 from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
15      FXF_CREAT, FXF_TRUNC, FXF_EXCL
16 from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser, ISession
17 from twisted.conch.avatar import ConchUser
18 from twisted.conch.openssh_compat import primes
19 from twisted.cred import portal
20 from twisted.internet.error import ProcessDone, ProcessTerminated
21 from twisted.python.failure import Failure
22 from twisted.internet.interfaces import ITransport
23
24 from twisted.internet import defer
25 from twisted.internet.interfaces import IFinishableConsumer
26 from foolscap.api import eventually
27 from allmydata.util import deferredutil
28
29 from allmydata.util.consumer import download_to_data
30 from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
31      NoSuchChildError, ChildOfWrongTypeError
32 from allmydata.mutable.common import NotWriteableError
33 from allmydata.mutable.publish import MutableFileHandle
34 from allmydata.immutable.upload import FileHandle
35 from allmydata.dirnode import update_metadata
36 from allmydata.util.fileutil import EncryptedTemporaryFile
37
38 noisy = True
39 use_foolscap_logging = True
40
41 from allmydata.util.log import NOISY, OPERATIONAL, WEIRD, \
42     msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
43
44 if use_foolscap_logging:
45     (logmsg, logerr, PrefixingLogMixin) = (_msg, _err, _PrefixingLogMixin)
46 else:  # pragma: no cover
47     def logmsg(s, level=None):
48         print s
49     def logerr(s, level=None):
50         print s
51     class PrefixingLogMixin:
52         def __init__(self, facility=None, prefix=''):
53             self.prefix = prefix
54         def log(self, s, level=None):
55             print "%r %s" % (self.prefix, s)
56
57
58 def eventually_callback(d):
59     return lambda res: eventually(d.callback, res)
60
61 def eventually_errback(d):
62     return lambda err: eventually(d.errback, err)
63
64
65 def _utf8(x):
66     if isinstance(x, unicode):
67         return x.encode('utf-8')
68     if isinstance(x, str):
69         return x
70     return repr(x)
71
72
73 def _to_sftp_time(t):
74     """SFTP times are unsigned 32-bit integers representing UTC seconds
75     (ignoring leap seconds) since the Unix epoch, January 1 1970 00:00 UTC.
76     A Tahoe time is the corresponding float."""
77     return long(t) & 0xFFFFFFFFL
78
79
80 def _convert_error(res, request):
81     """If res is not a Failure, return it, otherwise reraise the appropriate
82     SFTPError."""
83
84     if not isinstance(res, Failure):
85         logged_res = res
86         if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
87         logmsg("SUCCESS %r %r" % (request, logged_res,), level=OPERATIONAL)
88         return res
89
90     err = res
91     logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
92     try:
93         if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
94     except Exception:  # pragma: no cover
95         pass
96
97     # The message argument to SFTPError must not reveal information that
98     # might compromise anonymity, if we are running over an anonymous network.
99
100     if err.check(SFTPError):
101         # original raiser of SFTPError has responsibility to ensure anonymity
102         raise err
103     if err.check(NoSuchChildError):
104         childname = _utf8(err.value.args[0])
105         raise SFTPError(FX_NO_SUCH_FILE, childname)
106     if err.check(NotWriteableError) or err.check(ChildOfWrongTypeError):
107         msg = _utf8(err.value.args[0])
108         raise SFTPError(FX_PERMISSION_DENIED, msg)
109     if err.check(ExistingChildError):
110         # Versions of SFTP after v3 (which is what twisted.conch implements)
111         # define a specific error code for this case: FX_FILE_ALREADY_EXISTS.
112         # However v3 doesn't; instead, other servers such as sshd return
113         # FX_FAILURE. The gvfs SFTP backend, for example, depends on this
114         # to translate the error to the equivalent of POSIX EEXIST, which is
115         # necessary for some picky programs (such as gedit).
116         msg = _utf8(err.value.args[0])
117         raise SFTPError(FX_FAILURE, msg)
118     if err.check(NotImplementedError):
119         raise SFTPError(FX_OP_UNSUPPORTED, _utf8(err.value))
120     if err.check(EOFError):
121         raise SFTPError(FX_EOF, "end of file reached")
122     if err.check(defer.FirstError):
123         _convert_error(err.value.subFailure, request)
124
125     # We assume that the error message is not anonymity-sensitive.
126     raise SFTPError(FX_FAILURE, _utf8(err.value))
127
128
129 def _repr_flags(flags):
130     return "|".join([f for f in
131                      [(flags & FXF_READ)   and "FXF_READ"   or None,
132                       (flags & FXF_WRITE)  and "FXF_WRITE"  or None,
133                       (flags & FXF_APPEND) and "FXF_APPEND" or None,
134                       (flags & FXF_CREAT)  and "FXF_CREAT"  or None,
135                       (flags & FXF_TRUNC)  and "FXF_TRUNC"  or None,
136                       (flags & FXF_EXCL)   and "FXF_EXCL"   or None,
137                      ]
138                      if f])
139
140
141 def _lsLine(name, attrs):
142     st_uid = "tahoe"
143     st_gid = "tahoe"
144     st_mtime = attrs.get("mtime", 0)
145     st_mode = attrs["permissions"]
146
147     # Some clients won't tolerate '?' in the size field (#1337).
148     st_size = attrs.get("size", 0)
149
150     # We don't know how many links there really are to this object.
151     st_nlink = 1
152
153     # Based on <http://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
154     # We can't call the version in Twisted because we might have a version earlier than
155     # <http://twistedmatrix.com/trac/changeset/25412> (released in Twisted 8.2).
156
157     mode = st_mode
158     perms = array.array('c', '-'*10)
159     ft = stat.S_IFMT(mode)
160     if   stat.S_ISDIR(ft): perms[0] = 'd'
161     elif stat.S_ISREG(ft): perms[0] = '-'
162     else: perms[0] = '?'
163     # user
164     if mode&stat.S_IRUSR: perms[1] = 'r'
165     if mode&stat.S_IWUSR: perms[2] = 'w'
166     if mode&stat.S_IXUSR: perms[3] = 'x'
167     # group
168     if mode&stat.S_IRGRP: perms[4] = 'r'
169     if mode&stat.S_IWGRP: perms[5] = 'w'
170     if mode&stat.S_IXGRP: perms[6] = 'x'
171     # other
172     if mode&stat.S_IROTH: perms[7] = 'r'
173     if mode&stat.S_IWOTH: perms[8] = 'w'
174     if mode&stat.S_IXOTH: perms[9] = 'x'
175     # suid/sgid never set
176
177     l = perms.tostring()
178     l += str(st_nlink).rjust(5) + ' '
179     un = str(st_uid)
180     l += un.ljust(9)
181     gr = str(st_gid)
182     l += gr.ljust(9)
183     sz = str(st_size)
184     l += sz.rjust(8)
185     l += ' '
186     day = 60 * 60 * 24
187     sixmo = day * 7 * 26
188     now = time()
189     if st_mtime + sixmo < now or st_mtime > now + day:
190         # mtime is more than 6 months ago, or more than one day in the future
191         l += strftime("%b %d  %Y ", localtime(st_mtime))
192     else:
193         l += strftime("%b %d %H:%M ", localtime(st_mtime))
194     l += name
195     return l
196
197
198 def _no_write(parent_readonly, child, metadata=None):
199     """Whether child should be listed as having read-only permissions in parent."""
200
201     if child.is_unknown():
202         return True
203     elif child.is_mutable():
204         return child.is_readonly()
205     elif parent_readonly or IDirectoryNode.providedBy(child):
206         return True
207     else:
208         return metadata is not None and metadata.get('no-write', False)
209
210
211 def _populate_attrs(childnode, metadata, size=None):
212     attrs = {}
213
214     # The permissions must have the S_IFDIR (040000) or S_IFREG (0100000)
215     # bits, otherwise the client may refuse to open a directory.
216     # Also, sshfs run as a non-root user requires files and directories
217     # to be world-readable/writeable.
218     # It is important that we never set the executable bits on files.
219     #
220     # Directories and unknown nodes have no size, and SFTP doesn't
221     # require us to make one up.
222     #
223     # childnode might be None, meaning that the file doesn't exist yet,
224     # but we're going to write it later.
225
226     if childnode and childnode.is_unknown():
227         perms = 0
228     elif childnode and IDirectoryNode.providedBy(childnode):
229         perms = S_IFDIR | 0777
230     else:
231         # For files, omit the size if we don't immediately know it.
232         if childnode and size is None:
233             size = childnode.get_size()
234         if size is not None:
235             assert isinstance(size, (int, long)) and not isinstance(size, bool), repr(size)
236             attrs['size'] = size
237         perms = S_IFREG | 0666
238
239     if metadata:
240         if metadata.get('no-write', False):
241             perms &= S_IFDIR | S_IFREG | 0555  # clear 'w' bits
242
243         # See webapi.txt for what these times mean.
244         # We would prefer to omit atime, but SFTP version 3 can only
245         # accept mtime if atime is also set.
246         if 'linkmotime' in metadata.get('tahoe', {}):
247             attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
248         elif 'mtime' in metadata:
249             attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['mtime'])
250
251         if 'linkcrtime' in metadata.get('tahoe', {}):
252             attrs['createtime'] = _to_sftp_time(metadata['tahoe']['linkcrtime'])
253
254     attrs['permissions'] = perms
255
256     # twisted.conch.ssh.filetransfer only implements SFTP version 3,
257     # which doesn't include SSH_FILEXFER_ATTR_FLAGS.
258
259     return attrs
260
261
262 def _attrs_to_metadata(attrs):
263     metadata = {}
264
265     for key in attrs:
266         if key == "mtime" or key == "ctime" or key == "createtime":
267             metadata[key] = long(attrs[key])
268         elif key.startswith("ext_"):
269             metadata[key] = str(attrs[key])
270
271     perms = attrs.get('permissions', stat.S_IWUSR)
272     if not (perms & stat.S_IWUSR):
273         metadata['no-write'] = True
274
275     return metadata
276
277
278 def _direntry_for(filenode_or_parent, childname, filenode=None):
279     assert isinstance(childname, (unicode, NoneType)), childname
280
281     if childname is None:
282         filenode_or_parent = filenode
283
284     if filenode_or_parent:
285         rw_uri = filenode_or_parent.get_write_uri()
286         if rw_uri and childname:
287             return rw_uri + "/" + childname.encode('utf-8')
288         else:
289             return rw_uri
290
291     return None
292
293
294 class OverwriteableFileConsumer(PrefixingLogMixin):
295     implements(IFinishableConsumer)
296     """I act both as a consumer for the download of the original file contents, and as a
297     wrapper for a temporary file that records the downloaded data and any overwrites.
298     I use a priority queue to keep track of which regions of the file have been overwritten
299     but not yet downloaded, so that the download does not clobber overwritten data.
300     I use another priority queue to record milestones at which to make callbacks
301     indicating that a given number of bytes have been downloaded.
302
303     The temporary file reflects the contents of the file that I represent, except that:
304      - regions that have neither been downloaded nor overwritten, if present,
305        contain garbage.
306      - the temporary file may be shorter than the represented file (it is never longer).
307        The latter's current size is stored in self.current_size.
308
309     This abstraction is mostly independent of SFTP. Consider moving it, if it is found
310     useful for other frontends."""
311
312     def __init__(self, download_size, tempfile_maker):
313         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
314         if noisy: self.log(".__init__(%r, %r)" % (download_size, tempfile_maker), level=NOISY)
315         self.download_size = download_size
316         self.current_size = download_size
317         self.f = tempfile_maker()
318         self.downloaded = 0
319         self.milestones = []  # empty heap of (offset, d)
320         self.overwrites = []  # empty heap of (start, end)
321         self.is_closed = False
322         self.done = self.when_reached(download_size)  # adds a milestone
323         self.is_done = False
324         def _signal_done(ign):
325             if noisy: self.log("DONE", level=NOISY)
326             self.is_done = True
327         self.done.addCallback(_signal_done)
328         self.producer = None
329
330     def get_file(self):
331         return self.f
332
333     def get_current_size(self):
334         return self.current_size
335
336     def set_current_size(self, size):
337         if noisy: self.log(".set_current_size(%r), current_size = %r, downloaded = %r" %
338                            (size, self.current_size, self.downloaded), level=NOISY)
339         if size < self.current_size or size < self.downloaded:
340             self.f.truncate(size)
341         if size > self.current_size:
342             self.overwrite(self.current_size, "\x00" * (size - self.current_size))
343         self.current_size = size
344
345         # make the invariant self.download_size <= self.current_size be true again
346         if size < self.download_size:
347             self.download_size = size
348
349         if self.downloaded >= self.download_size:
350             self.finish()
351
352     def registerProducer(self, p, streaming):
353         if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
354         if self.producer is not None:
355             raise RuntimeError("producer is already registered")
356
357         self.producer = p
358         if streaming:
359             # call resumeProducing once to start things off
360             p.resumeProducing()
361         else:
362             def _iterate():
363                 if not self.is_done:
364                     p.resumeProducing()
365                     eventually(_iterate)
366             _iterate()
367
368     def write(self, data):
369         if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
370         if self.is_closed:
371             return
372
373         if self.downloaded >= self.download_size:
374             return
375
376         next_downloaded = self.downloaded + len(data)
377         if next_downloaded > self.download_size:
378             data = data[:(self.download_size - self.downloaded)]
379
380         while len(self.overwrites) > 0:
381             (start, end) = self.overwrites[0]
382             if start >= next_downloaded:
383                 # This and all remaining overwrites are after the data we just downloaded.
384                 break
385             if start > self.downloaded:
386                 # The data we just downloaded has been partially overwritten.
387                 # Write the prefix of it that precedes the overwritten region.
388                 self.f.seek(self.downloaded)
389                 self.f.write(data[:(start - self.downloaded)])
390
391             # This merges consecutive overwrites if possible, which allows us to detect the
392             # case where the download can be stopped early because the remaining region
393             # to download has already been fully overwritten.
394             heapq.heappop(self.overwrites)
395             while len(self.overwrites) > 0:
396                 (start1, end1) = self.overwrites[0]
397                 if start1 > end:
398                     break
399                 end = end1
400                 heapq.heappop(self.overwrites)
401
402             if end >= next_downloaded:
403                 # This overwrite extends past the downloaded data, so there is no
404                 # more data to consider on this call.
405                 heapq.heappush(self.overwrites, (next_downloaded, end))
406                 self._update_downloaded(next_downloaded)
407                 return
408             elif end >= self.downloaded:
409                 data = data[(end - self.downloaded):]
410                 self._update_downloaded(end)
411
412         self.f.seek(self.downloaded)
413         self.f.write(data)
414         self._update_downloaded(next_downloaded)
415
416     def _update_downloaded(self, new_downloaded):
417         self.downloaded = new_downloaded
418         milestone = new_downloaded
419         if len(self.overwrites) > 0:
420             (start, end) = self.overwrites[0]
421             if start <= new_downloaded and end > milestone:
422                 milestone = end
423
424         while len(self.milestones) > 0:
425             (next, d) = self.milestones[0]
426             if next > milestone:
427                 return
428             if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
429             heapq.heappop(self.milestones)
430             eventually(d.callback, None)
431
432         if milestone >= self.download_size:
433             self.finish()
434
435     def overwrite(self, offset, data):
436         if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
437         if offset > self.current_size:
438             # Normally writing at an offset beyond the current end-of-file
439             # would leave a hole that appears filled with zeroes. However, an
440             # EncryptedTemporaryFile doesn't behave like that (if there is a
441             # hole in the file on disk, the zeroes that are read back will be
442             # XORed with the keystream). So we must explicitly write zeroes in
443             # the gap between the current EOF and the offset.
444
445             self.f.seek(self.current_size)
446             self.f.write("\x00" * (offset - self.current_size))
447             start = self.current_size
448         else:
449             self.f.seek(offset)
450             start = offset
451
452         self.f.write(data)
453         end = offset + len(data)
454         self.current_size = max(self.current_size, end)
455         if end > self.downloaded:
456             heapq.heappush(self.overwrites, (start, end))
457
458     def read(self, offset, length):
459         """When the data has been read, callback the Deferred that we return with this data.
460         Otherwise errback the Deferred that we return.
461         The caller must perform no more overwrites until the Deferred has fired."""
462
463         if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
464         if offset >= self.current_size:
465             def _eof(): raise EOFError("read past end of file")
466             return defer.execute(_eof)
467
468         if offset + length > self.current_size:
469             length = self.current_size - offset
470             if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
471
472         needed = min(offset + length, self.download_size)
473         d = self.when_reached(needed)
474         def _reached(ign):
475             # It is not necessarily the case that self.downloaded >= needed, because
476             # the file might have been truncated (thus truncating the download) and
477             # then extended.
478
479             assert self.current_size >= offset + length, (self.current_size, offset, length)
480             if noisy: self.log("self.f = %r" % (self.f,), level=NOISY)
481             self.f.seek(offset)
482             return self.f.read(length)
483         d.addCallback(_reached)
484         return d
485
486     def when_reached(self, index):
487         if noisy: self.log(".when_reached(%r)" % (index,), level=NOISY)
488         if index <= self.downloaded:  # already reached
489             if noisy: self.log("already reached %r" % (index,), level=NOISY)
490             return defer.succeed(None)
491         d = defer.Deferred()
492         def _reached(ign):
493             if noisy: self.log("reached %r" % (index,), level=NOISY)
494             return ign
495         d.addCallback(_reached)
496         heapq.heappush(self.milestones, (index, d))
497         return d
498
499     def when_done(self):
500         return self.done
501
502     def finish(self):
503         """Called by the producer when it has finished producing, or when we have
504         received enough bytes, or as a result of a close. Defined by IFinishableConsumer."""
505
506         while len(self.milestones) > 0:
507             (next, d) = self.milestones[0]
508             if noisy: self.log("MILESTONE FINISH %r %r" % (next, d), level=NOISY)
509             heapq.heappop(self.milestones)
510             # The callback means that the milestone has been reached if
511             # it is ever going to be. Note that the file may have been
512             # truncated to before the milestone.
513             eventually(d.callback, None)
514
515     def close(self):
516         if not self.is_closed:
517             self.is_closed = True
518             try:
519                 self.f.close()
520             except Exception, e:
521                 self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
522         self.finish()
523
524     def unregisterProducer(self):
525         pass
526
527
528 SIZE_THRESHOLD = 1000
529
530
531 class ShortReadOnlySFTPFile(PrefixingLogMixin):
532     implements(ISFTPFile)
533     """I represent a file handle to a particular file on an SFTP connection.
534     I am used only for short immutable files opened in read-only mode.
535     When I am created, the file contents start to be downloaded to memory.
536     self.async is used to delay read requests until the download has finished."""
537
538     def __init__(self, userpath, filenode, metadata):
539         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
540         if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
541
542         assert isinstance(userpath, str) and IFileNode.providedBy(filenode), (userpath, filenode)
543         self.filenode = filenode
544         self.metadata = metadata
545         self.async = download_to_data(filenode)
546         self.closed = False
547
548     def readChunk(self, offset, length):
549         request = ".readChunk(%r, %r)" % (offset, length)
550         self.log(request, level=OPERATIONAL)
551
552         if self.closed:
553             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
554             return defer.execute(_closed)
555
556         d = defer.Deferred()
557         def _read(data):
558             if noisy: self.log("_read(<data of length %r>) in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY)
559
560             # "In response to this request, the server will read as many bytes as it
561             #  can from the file (up to 'len'), and return them in a SSH_FXP_DATA
562             #  message.  If an error occurs or EOF is encountered before reading any
563             #  data, the server will respond with SSH_FXP_STATUS.  For normal disk
564             #  files, it is guaranteed that this will read the specified number of
565             #  bytes, or up to end of file."
566             #
567             # i.e. we respond with an EOF error iff offset is already at EOF.
568
569             if offset >= len(data):
570                 eventually(d.errback, SFTPError(FX_EOF, "read at or past end of file"))
571             else:
572                 eventually(d.callback, data[offset:offset+length])  # truncated if offset+length > len(data)
573             return data
574         self.async.addCallbacks(_read, eventually_errback(d))
575         d.addBoth(_convert_error, request)
576         return d
577
578     def writeChunk(self, offset, data):
579         self.log(".writeChunk(%r, <data of length %r>) denied" % (offset, len(data)), level=OPERATIONAL)
580
581         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
582         return defer.execute(_denied)
583
584     def close(self):
585         self.log(".close()", level=OPERATIONAL)
586
587         self.closed = True
588         return defer.succeed(None)
589
590     def getAttrs(self):
591         request = ".getAttrs()"
592         self.log(request, level=OPERATIONAL)
593
594         if self.closed:
595             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
596             return defer.execute(_closed)
597
598         d = defer.execute(_populate_attrs, self.filenode, self.metadata)
599         d.addBoth(_convert_error, request)
600         return d
601
602     def setAttrs(self, attrs):
603         self.log(".setAttrs(%r) denied" % (attrs,), level=OPERATIONAL)
604         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
605         return defer.execute(_denied)
606
607
608 class GeneralSFTPFile(PrefixingLogMixin):
609     implements(ISFTPFile)
610     """I represent a file handle to a particular file on an SFTP connection.
611     I wrap an instance of OverwriteableFileConsumer, which is responsible for
612     storing the file contents. In order to allow write requests to be satisfied
613     immediately, there is effectively a FIFO queue between requests made to this
614     file handle, and requests to my OverwriteableFileConsumer. This queue is
615     implemented by the callback chain of self.async.
616
617     When first constructed, I am in an 'unopened' state that causes most
618     operations to be delayed until 'open' is called."""
619
620     def __init__(self, userpath, flags, close_notify, convergence):
621         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
622         if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
623                            (userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
624
625         assert isinstance(userpath, str), userpath
626         self.userpath = userpath
627         self.flags = flags
628         self.close_notify = close_notify
629         self.convergence = convergence
630         self.async = defer.Deferred()
631         # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
632         self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
633         self.closed = False
634         self.abandoned = False
635         self.parent = None
636         self.childname = None
637         self.filenode = None
638         self.metadata = None
639
640         # self.consumer should only be relied on in callbacks for self.async, since it might
641         # not be set before then.
642         self.consumer = None
643
644     def open(self, parent=None, childname=None, filenode=None, metadata=None):
645         self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
646                  (parent, childname, filenode, metadata), level=OPERATIONAL)
647
648         assert isinstance(childname, (unicode, NoneType)), childname
649         # If the file has been renamed, the new (parent, childname) takes precedence.
650         if self.parent is None:
651             self.parent = parent
652         if self.childname is None:
653             self.childname = childname
654         self.filenode = filenode
655         self.metadata = metadata
656
657         assert not self.closed, self
658         tempfile_maker = EncryptedTemporaryFile
659
660         if (self.flags & FXF_TRUNC) or not filenode:
661             # We're either truncating or creating the file, so we don't need the old contents.
662             self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
663             self.consumer.finish()
664         else:
665             assert IFileNode.providedBy(filenode), filenode
666
667             self.async.addCallback(lambda ignored: filenode.get_best_readable_version())
668
669             def _read(version):
670                 if noisy: self.log("_read", level=NOISY)
671                 download_size = version.get_size()
672                 assert download_size is not None
673
674                 self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
675
676                 version.read(self.consumer, 0, None)
677             self.async.addCallback(_read)
678
679         eventually(self.async.callback, None)
680
681         if noisy: self.log("open done", level=NOISY)
682         return self
683
684     def get_userpath(self):
685         return self.userpath
686
687     def get_direntry(self):
688         return _direntry_for(self.parent, self.childname)
689
690     def rename(self, new_userpath, new_parent, new_childname):
691         self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
692
693         assert isinstance(new_userpath, str) and isinstance(new_childname, unicode), (new_userpath, new_childname)
694         self.userpath = new_userpath
695         self.parent = new_parent
696         self.childname = new_childname
697
698     def abandon(self):
699         self.log(".abandon()", level=OPERATIONAL)
700
701         self.abandoned = True
702
703     def sync(self, ign=None):
704         # The ign argument allows some_file.sync to be used as a callback.
705         self.log(".sync()", level=OPERATIONAL)
706
707         d = defer.Deferred()
708         self.async.addBoth(eventually_callback(d))
709         def _done(res):
710             if noisy: self.log("_done(%r) in .sync()" % (res,), level=NOISY)
711             return res
712         d.addBoth(_done)
713         return d
714
715     def readChunk(self, offset, length):
716         request = ".readChunk(%r, %r)" % (offset, length)
717         self.log(request, level=OPERATIONAL)
718
719         if not (self.flags & FXF_READ):
720             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for reading")
721             return defer.execute(_denied)
722
723         if self.closed:
724             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
725             return defer.execute(_closed)
726
727         d = defer.Deferred()
728         def _read(ign):
729             if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
730             d2 = self.consumer.read(offset, length)
731             d2.addCallbacks(eventually_callback(d), eventually_errback(d))
732             # It is correct to drop d2 here.
733             return None
734         self.async.addCallbacks(_read, eventually_errback(d))
735         d.addBoth(_convert_error, request)
736         return d
737
738     def writeChunk(self, offset, data):
739         self.log(".writeChunk(%r, <data of length %r>)" % (offset, len(data)), level=OPERATIONAL)
740
741         if not (self.flags & FXF_WRITE):
742             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
743             return defer.execute(_denied)
744
745         if self.closed:
746             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
747             return defer.execute(_closed)
748
749         self.has_changed = True
750
751         # Note that we return without waiting for the write to occur. Reads and
752         # close wait for prior writes, and will fail if any prior operation failed.
753         # This is ok because SFTP makes no guarantee that the write completes
754         # before the request does. In fact it explicitly allows write errors to be
755         # delayed until close:
756         #   "One should note that on some server platforms even a close can fail.
757         #    This can happen e.g. if the server operating system caches writes,
758         #    and an error occurs while flushing cached writes during the close."
759
760         def _write(ign):
761             if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
762                                (offset, len(data), self.consumer.get_current_size()), level=NOISY)
763             # FXF_APPEND means that we should always write at the current end of file.
764             write_offset = offset
765             if self.flags & FXF_APPEND:
766                 write_offset = self.consumer.get_current_size()
767
768             self.consumer.overwrite(write_offset, data)
769             if noisy: self.log("overwrite done", level=NOISY)
770             return None
771         self.async.addCallback(_write)
772         # don't addErrback to self.async, just allow subsequent async ops to fail.
773         return defer.succeed(None)
774
775     def close(self):
776         request = ".close()"
777         self.log(request, level=OPERATIONAL)
778
779         if self.closed:
780             return defer.succeed(None)
781
782         # This means that close has been called, not that the close has succeeded.
783         self.closed = True
784
785         if not (self.flags & (FXF_WRITE | FXF_CREAT)):
786             def _readonly_close():
787                 if self.consumer:
788                     self.consumer.close()
789             return defer.execute(_readonly_close)
790
791         # We must capture the abandoned, parent, and childname variables synchronously
792         # at the close call. This is needed by the correctness arguments in the comments
793         # for _abandon_any_heisenfiles and _rename_heisenfiles.
794         # Note that the file must have been opened before it can be closed.
795         abandoned = self.abandoned
796         parent = self.parent
797         childname = self.childname
798
799         # has_changed is set when writeChunk is called, not when the write occurs, so
800         # it is correct to optimize out the commit if it is False at the close call.
801         has_changed = self.has_changed
802
803         def _committed(res):
804             if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
805
806             self.consumer.close()
807
808             # We must close_notify before re-firing self.async.
809             if self.close_notify:
810                 self.close_notify(self.userpath, self.parent, self.childname, self)
811             return res
812
813         def _close(ign):
814             d2 = self.consumer.when_done()
815             if self.filenode and self.filenode.is_mutable():
816                 self.log("update mutable file %r childname=%r metadata=%r" % (self.filenode, childname, self.metadata), level=OPERATIONAL)
817                 if self.metadata.get('no-write', False) and not self.filenode.is_readonly():
818                     assert parent and childname, (parent, childname, self.metadata)
819                     d2.addCallback(lambda ign: parent.set_metadata_for(childname, self.metadata))
820
821                 d2.addCallback(lambda ign: self.filenode.overwrite(MutableFileHandle(self.consumer.get_file())))
822             else:
823                 def _add_file(ign):
824                     self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
825                     u = FileHandle(self.consumer.get_file(), self.convergence)
826                     return parent.add_file(childname, u, metadata=self.metadata)
827                 d2.addCallback(_add_file)
828
829             d2.addBoth(_committed)
830             return d2
831
832         d = defer.Deferred()
833
834         # If the file has been abandoned, we don't want the close operation to get "stuck",
835         # even if self.async fails to re-fire. Doing the close independently of self.async
836         # in that case ensures that dropping an ssh connection is sufficient to abandon
837         # any heisenfiles that were not explicitly closed in that connection.
838         if abandoned or not has_changed:
839             d.addCallback(_committed)
840         else:
841             self.async.addCallback(_close)
842
843         self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
844         d.addBoth(_convert_error, request)
845         return d
846
847     def getAttrs(self):
848         request = ".getAttrs()"
849         self.log(request, level=OPERATIONAL)
850
851         if self.closed:
852             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
853             return defer.execute(_closed)
854
855         # Optimization for read-only handles, when we already know the metadata.
856         if not (self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
857             return defer.succeed(_populate_attrs(self.filenode, self.metadata))
858
859         d = defer.Deferred()
860         def _get(ign):
861             if noisy: self.log("_get(%r) in %r, filenode = %r, metadata = %r" % (ign, request, self.filenode, self.metadata), level=NOISY)
862
863             # self.filenode might be None, but that's ok.
864             attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
865             eventually(d.callback, attrs)
866             return None
867         self.async.addCallbacks(_get, eventually_errback(d))
868         d.addBoth(_convert_error, request)
869         return d
870
871     def setAttrs(self, attrs, only_if_at=None):
872         request = ".setAttrs(%r, only_if_at=%r)" % (attrs, only_if_at)
873         self.log(request, level=OPERATIONAL)
874
875         if not (self.flags & FXF_WRITE):
876             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
877             return defer.execute(_denied)
878
879         if self.closed:
880             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
881             return defer.execute(_closed)
882
883         size = attrs.get("size", None)
884         if size is not None and (not isinstance(size, (int, long)) or size < 0):
885             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
886             return defer.execute(_bad)
887
888         d = defer.Deferred()
889         def _set(ign):
890             if noisy: self.log("_set(%r) in %r" % (ign, request), level=NOISY)
891             current_direntry = _direntry_for(self.parent, self.childname, self.filenode)
892             if only_if_at and only_if_at != current_direntry:
893                 if noisy: self.log("not setting attributes: current_direntry=%r in %r" %
894                                    (current_direntry, request), level=NOISY)
895                 return None
896
897             now = time()
898             self.metadata = update_metadata(self.metadata, _attrs_to_metadata(attrs), now)
899             if size is not None:
900                 # TODO: should we refuse to truncate a file opened with FXF_APPEND?
901                 # <http://allmydata.org/trac/tahoe-lafs/ticket/1037#comment:20>
902                 self.consumer.set_current_size(size)
903             eventually(d.callback, None)
904             return None
905         self.async.addCallbacks(_set, eventually_errback(d))
906         d.addBoth(_convert_error, request)
907         return d
908
909
910 class StoppableList:
911     def __init__(self, items):
912         self.items = items
913     def __iter__(self):
914         for i in self.items:
915             yield i
916     def close(self):
917         pass
918
919
920 class Reason:
921     def __init__(self, value):
922         self.value = value
923
924
925 # A "heisenfile" is a file that has been opened with write flags
926 # (FXF_WRITE and/or FXF_CREAT) and not yet close-notified.
927 # 'all_heisenfiles' maps from a direntry string to a list of
928 # GeneralSFTPFile.
929 #
930 # A direntry string is parent_write_uri + "/" + childname_utf8 for
931 # an immutable file, or file_write_uri for a mutable file.
932 # Updates to this dict are single-threaded.
933
934 all_heisenfiles = {}
935
936 def _reload():
937     global all_heisenfiles
938     all_heisenfiles = {}
939
940 class SFTPUserHandler(ConchUser, PrefixingLogMixin):
941     implements(ISFTPServer)
942     def __init__(self, client, rootnode, username):
943         ConchUser.__init__(self)
944         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=username)
945         if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
946
947         self.channelLookup["session"] = session.SSHSession
948         self.subsystemLookup["sftp"] = FileTransferServer
949
950         self._client = client
951         self._root = rootnode
952         self._username = username
953         self._convergence = client.convergence
954
955         # maps from UTF-8 paths for this user, to files written and still open
956         self._heisenfiles = {}
957
958     def gotVersion(self, otherVersion, extData):
959         self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
960
961         # advertise the same extensions as the OpenSSH SFTP server
962         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
963         return {'posix-rename@openssh.com': '1',
964                 'statvfs@openssh.com': '2',
965                 'fstatvfs@openssh.com': '2',
966                }
967
968     def logout(self):
969         self.log(".logout()", level=OPERATIONAL)
970
971         for files in self._heisenfiles.itervalues():
972             for f in files:
973                 f.abandon()
974
975     def _add_heisenfile_by_path(self, file):
976         self.log("._add_heisenfile_by_path(%r)" % (file,), level=OPERATIONAL)
977
978         userpath = file.get_userpath()
979         if userpath in self._heisenfiles:
980             self._heisenfiles[userpath] += [file]
981         else:
982             self._heisenfiles[userpath] = [file]
983
984     def _add_heisenfile_by_direntry(self, file):
985         self.log("._add_heisenfile_by_direntry(%r)" % (file,), level=OPERATIONAL)
986
987         direntry = file.get_direntry()
988         if direntry:
989             if direntry in all_heisenfiles:
990                 all_heisenfiles[direntry] += [file]
991             else:
992                 all_heisenfiles[direntry] = [file]
993
994     def _abandon_any_heisenfiles(self, userpath, direntry):
995         request = "._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry)
996         self.log(request, level=OPERATIONAL)
997
998         assert isinstance(userpath, str), userpath
999
1000         # First we synchronously mark all heisenfiles matching the userpath or direntry
1001         # as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
1002         # each file that we abandoned.
1003         #
1004         # For each file, the call to .abandon() occurs:
1005         #   * before the file is closed, in which case it will never be committed
1006         #     (uploaded+linked or published); or
1007         #   * after it is closed but before it has been close_notified, in which case the
1008         #     .sync() ensures that it has been committed (successfully or not) before we
1009         #     return.
1010         #
1011         # This avoids a race that might otherwise cause the file to be committed after
1012         # the remove operation has completed.
1013         #
1014         # We return a Deferred that fires with True if any files were abandoned (this
1015         # does not mean that they were not committed; it is used to determine whether
1016         # a NoSuchChildError from the attempt to delete the file should be suppressed).
1017
1018         files = []
1019         if direntry in all_heisenfiles:
1020             files = all_heisenfiles[direntry]
1021             del all_heisenfiles[direntry]
1022         if userpath in self._heisenfiles:
1023             files += self._heisenfiles[userpath]
1024             del self._heisenfiles[userpath]
1025
1026         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1027
1028         for f in files:
1029             f.abandon()
1030
1031         d = defer.succeed(None)
1032         for f in files:
1033             d.addBoth(f.sync)
1034
1035         def _done(ign):
1036             self.log("done %r" % (request,), level=OPERATIONAL)
1037             return len(files) > 0
1038         d.addBoth(_done)
1039         return d
1040
1041     def _rename_heisenfiles(self, from_userpath, from_parent, from_childname,
1042                             to_userpath, to_parent, to_childname, overwrite=True):
1043         request = ("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" %
1044                    (from_userpath, from_parent, from_childname, to_userpath, to_parent, to_childname, overwrite))
1045         self.log(request, level=OPERATIONAL)
1046
1047         assert (isinstance(from_userpath, str) and isinstance(from_childname, unicode) and
1048                 isinstance(to_userpath, str) and isinstance(to_childname, unicode)), \
1049                (from_userpath, from_childname, to_userpath, to_childname)
1050
1051         if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1052
1053         # First we synchronously rename all heisenfiles matching the userpath or direntry.
1054         # Then we .sync() each file that we renamed.
1055         #
1056         # For each file, the call to .rename occurs:
1057         #   * before the file is closed, in which case it will be committed at the
1058         #     new direntry; or
1059         #   * after it is closed but before it has been close_notified, in which case the
1060         #     .sync() ensures that it has been committed (successfully or not) before we
1061         #     return.
1062         #
1063         # This avoids a race that might otherwise cause the file to be committed at the
1064         # old name after the rename operation has completed.
1065         #
1066         # Note that if overwrite is False, the caller should already have checked
1067         # whether a real direntry exists at the destination. It is possible that another
1068         # direntry (heisen or real) comes to exist at the destination after that check,
1069         # but in that case it is correct for the rename to succeed (and for the commit
1070         # of the heisenfile at the destination to possibly clobber the other entry, since
1071         # that can happen anyway when we have concurrent write handles to the same direntry).
1072         #
1073         # We return a Deferred that fires with True if any files were renamed (this
1074         # does not mean that they were not committed; it is used to determine whether
1075         # a NoSuchChildError from the rename attempt should be suppressed). If overwrite
1076         # is False and there were already heisenfiles at the destination userpath or
1077         # direntry, we return a Deferred that fails with SFTPError(FX_PERMISSION_DENIED).
1078
1079         from_direntry = _direntry_for(from_parent, from_childname)
1080         to_direntry = _direntry_for(to_parent, to_childname)
1081
1082         if noisy: self.log("from_direntry = %r, to_direntry = %r, len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
1083                            (from_direntry, to_direntry, len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
1084
1085         if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
1086             def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1087             if noisy: self.log("existing", level=NOISY)
1088             return defer.execute(_existing)
1089
1090         from_files = []
1091         if from_direntry in all_heisenfiles:
1092             from_files = all_heisenfiles[from_direntry]
1093             del all_heisenfiles[from_direntry]
1094         if from_userpath in self._heisenfiles:
1095             from_files += self._heisenfiles[from_userpath]
1096             del self._heisenfiles[from_userpath]
1097
1098         if noisy: self.log("from_files = %r in %r" % (from_files, request), level=NOISY)
1099
1100         for f in from_files:
1101             f.rename(to_userpath, to_parent, to_childname)
1102             self._add_heisenfile_by_path(f)
1103             self._add_heisenfile_by_direntry(f)
1104
1105         d = defer.succeed(None)
1106         for f in from_files:
1107             d.addBoth(f.sync)
1108
1109         def _done(ign):
1110             if noisy: self.log("done: len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
1111                                (len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
1112             return len(from_files) > 0
1113         d.addBoth(_done)
1114         return d
1115
1116     def _update_attrs_for_heisenfiles(self, userpath, direntry, attrs):
1117         request = "._update_attrs_for_heisenfiles(%r, %r, %r)" % (userpath, direntry, attrs)
1118         self.log(request, level=OPERATIONAL)
1119
1120         assert isinstance(userpath, str) and isinstance(direntry, str), (userpath, direntry)
1121
1122         files = []
1123         if direntry in all_heisenfiles:
1124             files = all_heisenfiles[direntry]
1125         if userpath in self._heisenfiles:
1126             files += self._heisenfiles[userpath]
1127
1128         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1129
1130         # We set the metadata for all heisenfiles at this path or direntry.
1131         # Since a direntry includes a write URI, we must have authority to
1132         # change the metadata of heisenfiles found in the all_heisenfiles dict.
1133         # However that's not necessarily the case for heisenfiles found by
1134         # path. Therefore we tell the setAttrs method of each file to only
1135         # perform the update if the file is at the correct direntry.
1136
1137         d = defer.succeed(None)
1138         for f in files:
1139             d.addBoth(f.setAttrs, attrs, only_if_at=direntry)
1140
1141         def _done(ign):
1142             self.log("done %r" % (request,), level=OPERATIONAL)
1143             # TODO: this should not return True if only_if_at caused all files to be skipped.
1144             return len(files) > 0
1145         d.addBoth(_done)
1146         return d
1147
1148     def _sync_heisenfiles(self, userpath, direntry, ignore=None):
1149         request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
1150         self.log(request, level=OPERATIONAL)
1151
1152         assert isinstance(userpath, str) and isinstance(direntry, (str, NoneType)), (userpath, direntry)
1153
1154         files = []
1155         if direntry in all_heisenfiles:
1156             files = all_heisenfiles[direntry]
1157         if userpath in self._heisenfiles:
1158             files += self._heisenfiles[userpath]
1159
1160         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1161
1162         d = defer.succeed(None)
1163         for f in files:
1164             if f is not ignore:
1165                 d.addBoth(f.sync)
1166
1167         def _done(ign):
1168             self.log("done %r" % (request,), level=OPERATIONAL)
1169             return None
1170         d.addBoth(_done)
1171         return d
1172
1173     def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
1174         if noisy: self.log("._remove_heisenfile(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
1175
1176         assert isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)), (userpath, childname)
1177
1178         direntry = _direntry_for(parent, childname)
1179         if direntry in all_heisenfiles:
1180             all_old_files = all_heisenfiles[direntry]
1181             all_new_files = [f for f in all_old_files if f is not file_to_remove]
1182             if len(all_new_files) > 0:
1183                 all_heisenfiles[direntry] = all_new_files
1184             else:
1185                 del all_heisenfiles[direntry]
1186
1187         if userpath in self._heisenfiles:
1188             old_files = self._heisenfiles[userpath]
1189             new_files = [f for f in old_files if f is not file_to_remove]
1190             if len(new_files) > 0:
1191                 self._heisenfiles[userpath] = new_files
1192             else:
1193                 del self._heisenfiles[userpath]
1194
1195         if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1196
1197     def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
1198         if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
1199                            (existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
1200                            level=NOISY)
1201
1202         assert (isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)) and
1203                 (metadata is None or 'no-write' in metadata)), (userpath, childname, metadata)
1204
1205         writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
1206         direntry = _direntry_for(parent, childname, filenode)
1207
1208         d = self._sync_heisenfiles(userpath, direntry, ignore=existing_file)
1209
1210         if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
1211             d.addCallback(lambda ign: ShortReadOnlySFTPFile(userpath, filenode, metadata))
1212         else:
1213             close_notify = None
1214             if writing:
1215                 close_notify = self._remove_heisenfile
1216
1217             d.addCallback(lambda ign: existing_file or GeneralSFTPFile(userpath, flags, close_notify, self._convergence))
1218             def _got_file(file):
1219                 file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
1220                 if writing:
1221                     self._add_heisenfile_by_direntry(file)
1222                 return file
1223             d.addCallback(_got_file)
1224         return d
1225
1226     def openFile(self, pathstring, flags, attrs, delay=None):
1227         request = ".openFile(%r, %r = %r, %r, delay=%r)" % (pathstring, flags, _repr_flags(flags), attrs, delay)
1228         self.log(request, level=OPERATIONAL)
1229
1230         # This is used for both reading and writing.
1231         # First exclude invalid combinations of flags, and empty paths.
1232
1233         if not (flags & (FXF_READ | FXF_WRITE)):
1234             def _bad_readwrite():
1235                 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
1236             return defer.execute(_bad_readwrite)
1237
1238         if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
1239             def _bad_exclcreat():
1240                 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
1241             return defer.execute(_bad_exclcreat)
1242
1243         path = self._path_from_string(pathstring)
1244         if not path:
1245             def _emptypath(): raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
1246             return defer.execute(_emptypath)
1247
1248         # The combination of flags is potentially valid.
1249
1250         # To work around clients that have race condition bugs, a getAttr, rename, or
1251         # remove request following an 'open' request with FXF_WRITE or FXF_CREAT flags,
1252         # should succeed even if the 'open' request has not yet completed. So we now
1253         # synchronously add a file object into the self._heisenfiles dict, indexed
1254         # by its UTF-8 userpath. (We can't yet add it to the all_heisenfiles dict,
1255         # because we don't yet have a user-independent path for the file.) The file
1256         # object does not know its filenode, parent, or childname at this point.
1257
1258         userpath = self._path_to_utf8(path)
1259
1260         if flags & (FXF_WRITE | FXF_CREAT):
1261             file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
1262             self._add_heisenfile_by_path(file)
1263         else:
1264             # We haven't decided which file implementation to use yet.
1265             file = None
1266
1267         desired_metadata = _attrs_to_metadata(attrs)
1268
1269         # Now there are two major cases:
1270         #
1271         #  1. The path is specified as /uri/FILECAP, with no parent directory.
1272         #     If the FILECAP is mutable and writeable, then we can open it in write-only
1273         #     or read/write mode (non-exclusively), otherwise we can only open it in
1274         #     read-only mode. The open should succeed immediately as long as FILECAP is
1275         #     a valid known filecap that grants the required permission.
1276         #
1277         #  2. The path is specified relative to a parent. We find the parent dirnode and
1278         #     get the child's URI and metadata if it exists. There are four subcases:
1279         #       a. the child does not exist: FXF_CREAT must be set, and we must be able
1280         #          to write to the parent directory.
1281         #       b. the child exists but is not a valid known filecap: fail
1282         #       c. the child is mutable: if we are trying to open it write-only or
1283         #          read/write, then we must be able to write to the file.
1284         #       d. the child is immutable: if we are trying to open it write-only or
1285         #          read/write, then we must be able to write to the parent directory.
1286         #
1287         # To reduce latency, open normally succeeds as soon as these conditions are
1288         # met, even though there might be a failure in downloading the existing file
1289         # or uploading a new one. However, there is an exception: if a file has been
1290         # written, then closed, and is now being reopened, then we have to delay the
1291         # open until the previous upload/publish has completed. This is necessary
1292         # because sshfs does not wait for the result of an FXF_CLOSE message before
1293         # reporting to the client that a file has been closed. It applies both to
1294         # mutable files, and to directory entries linked to an immutable file.
1295         #
1296         # Note that the permission checks below are for more precise error reporting on
1297         # the open call; later operations would fail even if we did not make these checks.
1298
1299         d = delay or defer.succeed(None)
1300         d.addCallback(lambda ign: self._get_root(path))
1301         def _got_root( (root, path) ):
1302             if root.is_unknown():
1303                 raise SFTPError(FX_PERMISSION_DENIED,
1304                                 "cannot open an unknown cap (or child of an unknown object). "
1305                                 "Upgrading the gateway to a later Tahoe-LAFS version may help")
1306             if not path:
1307                 # case 1
1308                 if noisy: self.log("case 1: root = %r, path[:-1] = %r" % (root, path[:-1]), level=NOISY)
1309                 if not IFileNode.providedBy(root):
1310                     raise SFTPError(FX_PERMISSION_DENIED,
1311                                     "cannot open a directory cap")
1312                 if (flags & FXF_WRITE) and root.is_readonly():
1313                     raise SFTPError(FX_PERMISSION_DENIED,
1314                                     "cannot write to a non-writeable filecap without a parent directory")
1315                 if flags & FXF_EXCL:
1316                     raise SFTPError(FX_FAILURE,
1317                                     "cannot create a file exclusively when it already exists")
1318
1319                 # The file does not need to be added to all_heisenfiles, because it is not
1320                 # associated with a directory entry that needs to be updated.
1321
1322                 metadata = update_metadata(None, desired_metadata, time())
1323
1324                 # We have to decide what to pass for the 'parent_readonly' argument to _no_write,
1325                 # given that we don't actually have a parent. This only affects the permissions
1326                 # reported by a getAttrs on this file handle in the case of an immutable file.
1327                 # We choose 'parent_readonly=True' since that will cause the permissions to be
1328                 # reported as r--r--r--, which is appropriate because an immutable file can't be
1329                 # written via this path.
1330
1331                 metadata['no-write'] = _no_write(True, root)
1332                 return self._make_file(file, userpath, flags, filenode=root, metadata=metadata)
1333             else:
1334                 # case 2
1335                 childname = path[-1]
1336
1337                 if noisy: self.log("case 2: root = %r, childname = %r, desired_metadata = %r, path[:-1] = %r" %
1338                                    (root, childname, desired_metadata, path[:-1]), level=NOISY)
1339                 d2 = root.get_child_at_path(path[:-1])
1340                 def _got_parent(parent):
1341                     if noisy: self.log("_got_parent(%r)" % (parent,), level=NOISY)
1342                     if parent.is_unknown():
1343                         raise SFTPError(FX_PERMISSION_DENIED,
1344                                         "cannot open a child of an unknown object. "
1345                                         "Upgrading the gateway to a later Tahoe-LAFS version may help")
1346
1347                     parent_readonly = parent.is_readonly()
1348                     d3 = defer.succeed(None)
1349                     if flags & FXF_EXCL:
1350                         # FXF_EXCL means that the link to the file (not the file itself) must
1351                         # be created atomically wrt updates by this storage client.
1352                         # That is, we need to create the link before returning success to the
1353                         # SFTP open request (and not just on close, as would normally be the
1354                         # case). We make the link initially point to a zero-length LIT file,
1355                         # which is consistent with what might happen on a POSIX filesystem.
1356
1357                         if parent_readonly:
1358                             raise SFTPError(FX_FAILURE,
1359                                             "cannot create a file exclusively when the parent directory is read-only")
1360
1361                         # 'overwrite=False' ensures failure if the link already exists.
1362                         # FIXME: should use a single call to set_uri and return (child, metadata) (#1035)
1363
1364                         zero_length_lit = "URI:LIT:"
1365                         if noisy: self.log("%r.set_uri(%r, None, readcap=%r, overwrite=False)" %
1366                                            (parent, zero_length_lit, childname), level=NOISY)
1367                         d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit,
1368                                                                   metadata=desired_metadata, overwrite=False))
1369                         def _seturi_done(child):
1370                             if noisy: self.log("%r.get_metadata_for(%r)" % (parent, childname), level=NOISY)
1371                             d4 = parent.get_metadata_for(childname)
1372                             d4.addCallback(lambda metadata: (child, metadata))
1373                             return d4
1374                         d3.addCallback(_seturi_done)
1375                     else:
1376                         if noisy: self.log("%r.get_child_and_metadata(%r)" % (parent, childname), level=NOISY)
1377                         d3.addCallback(lambda ign: parent.get_child_and_metadata(childname))
1378
1379                     def _got_child( (filenode, current_metadata) ):
1380                         if noisy: self.log("_got_child( (%r, %r) )" % (filenode, current_metadata), level=NOISY)
1381
1382                         metadata = update_metadata(current_metadata, desired_metadata, time())
1383
1384                         # Ignore the permissions of the desired_metadata in an open call. The permissions
1385                         # can only be set by setAttrs.
1386                         metadata['no-write'] = _no_write(parent_readonly, filenode, current_metadata)
1387
1388                         if filenode.is_unknown():
1389                             raise SFTPError(FX_PERMISSION_DENIED,
1390                                             "cannot open an unknown cap. Upgrading the gateway "
1391                                             "to a later Tahoe-LAFS version may help")
1392                         if not IFileNode.providedBy(filenode):
1393                             raise SFTPError(FX_PERMISSION_DENIED,
1394                                             "cannot open a directory as if it were a file")
1395                         if (flags & FXF_WRITE) and metadata['no-write']:
1396                             raise SFTPError(FX_PERMISSION_DENIED,
1397                                             "cannot open a non-writeable file for writing")
1398
1399                         return self._make_file(file, userpath, flags, parent=parent, childname=childname,
1400                                                filenode=filenode, metadata=metadata)
1401                     def _no_child(f):
1402                         if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
1403                         f.trap(NoSuchChildError)
1404
1405                         if not (flags & FXF_CREAT):
1406                             raise SFTPError(FX_NO_SUCH_FILE,
1407                                             "the file does not exist, and was not opened with the creation (CREAT) flag")
1408                         if parent_readonly:
1409                             raise SFTPError(FX_PERMISSION_DENIED,
1410                                             "cannot create a file when the parent directory is read-only")
1411
1412                         return self._make_file(file, userpath, flags, parent=parent, childname=childname)
1413                     d3.addCallbacks(_got_child, _no_child)
1414                     return d3
1415
1416                 d2.addCallback(_got_parent)
1417                 return d2
1418
1419         d.addCallback(_got_root)
1420         def _remove_on_error(err):
1421             if file:
1422                 self._remove_heisenfile(userpath, None, None, file)
1423             return err
1424         d.addErrback(_remove_on_error)
1425         d.addBoth(_convert_error, request)
1426         return d
1427
1428     def renameFile(self, from_pathstring, to_pathstring, overwrite=False):
1429         request = ".renameFile(%r, %r)" % (from_pathstring, to_pathstring)
1430         self.log(request, level=OPERATIONAL)
1431
1432         from_path = self._path_from_string(from_pathstring)
1433         to_path = self._path_from_string(to_pathstring)
1434         from_userpath = self._path_to_utf8(from_path)
1435         to_userpath = self._path_to_utf8(to_path)
1436
1437         # the target directory must already exist
1438         d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
1439                                         self._get_parent_or_node(to_path)])
1440         def _got( (from_pair, to_pair) ):
1441             if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r, overwrite=%r)" %
1442                                (from_pair, to_pair, from_pathstring, to_pathstring, overwrite), level=NOISY)
1443             (from_parent, from_childname) = from_pair
1444             (to_parent, to_childname) = to_pair
1445
1446             if from_childname is None:
1447                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename a source object specified by URI")
1448             if to_childname is None:
1449                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename to a destination specified by URI")
1450
1451             # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
1452             # "It is an error if there already exists a file with the name specified
1453             #  by newpath."
1454             # OpenSSH's SFTP server returns FX_PERMISSION_DENIED for this error.
1455             #
1456             # For the standard SSH_FXP_RENAME operation, overwrite=False.
1457             # We also support the posix-rename@openssh.com extension, which uses overwrite=True.
1458
1459             d2 = defer.succeed(None)
1460             if not overwrite:
1461                 d2.addCallback(lambda ign: to_parent.get(to_childname))
1462                 def _expect_fail(res):
1463                     if not isinstance(res, Failure):
1464                         raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1465
1466                     # It is OK if we fail for errors other than NoSuchChildError, since that probably
1467                     # indicates some problem accessing the destination directory.
1468                     res.trap(NoSuchChildError)
1469                 d2.addBoth(_expect_fail)
1470
1471             # If there are heisenfiles to be written at the 'from' direntry, then ensure
1472             # they will now be written at the 'to' direntry instead.
1473             d2.addCallback(lambda ign:
1474                            self._rename_heisenfiles(from_userpath, from_parent, from_childname,
1475                                                     to_userpath, to_parent, to_childname, overwrite=overwrite))
1476
1477             def _move(renamed):
1478                 # FIXME: use move_child_to_path to avoid possible data loss due to #943
1479                 #d3 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
1480
1481                 d3 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
1482                 def _check(err):
1483                     if noisy: self.log("_check(%r) in .renameFile(%r, %r, overwrite=%r)" %
1484                                        (err, from_pathstring, to_pathstring, overwrite), level=NOISY)
1485
1486                     if not isinstance(err, Failure) or (renamed and err.check(NoSuchChildError)):
1487                         return None
1488                     if not overwrite and err.check(ExistingChildError):
1489                         raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1490
1491                     return err
1492                 d3.addBoth(_check)
1493                 return d3
1494             d2.addCallback(_move)
1495             return d2
1496         d.addCallback(_got)
1497         d.addBoth(_convert_error, request)
1498         return d
1499
1500     def makeDirectory(self, pathstring, attrs):
1501         request = ".makeDirectory(%r, %r)" % (pathstring, attrs)
1502         self.log(request, level=OPERATIONAL)
1503
1504         path = self._path_from_string(pathstring)
1505         metadata = _attrs_to_metadata(attrs)
1506         if 'no-write' in metadata:
1507             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "cannot create a directory that is initially read-only")
1508             return defer.execute(_denied)
1509
1510         d = self._get_root(path)
1511         d.addCallback(lambda (root, path):
1512                       self._get_or_create_directories(root, path, metadata))
1513         d.addBoth(_convert_error, request)
1514         return d
1515
1516     def _get_or_create_directories(self, node, path, metadata):
1517         if not IDirectoryNode.providedBy(node):
1518             # TODO: provide the name of the blocking file in the error message.
1519             def _blocked(): raise SFTPError(FX_FAILURE, "cannot create directory because there "
1520                                                         "is a file in the way") # close enough
1521             return defer.execute(_blocked)
1522
1523         if not path:
1524             return defer.succeed(node)
1525         d = node.get(path[0])
1526         def _maybe_create(f):
1527             f.trap(NoSuchChildError)
1528             return node.create_subdirectory(path[0])
1529         d.addErrback(_maybe_create)
1530         d.addCallback(self._get_or_create_directories, path[1:], metadata)
1531         return d
1532
1533     def removeFile(self, pathstring):
1534         request = ".removeFile(%r)" % (pathstring,)
1535         self.log(request, level=OPERATIONAL)
1536
1537         path = self._path_from_string(pathstring)
1538         d = self._remove_object(path, must_be_file=True)
1539         d.addBoth(_convert_error, request)
1540         return d
1541
1542     def removeDirectory(self, pathstring):
1543         request = ".removeDirectory(%r)" % (pathstring,)
1544         self.log(request, level=OPERATIONAL)
1545
1546         path = self._path_from_string(pathstring)
1547         d = self._remove_object(path, must_be_directory=True)
1548         d.addBoth(_convert_error, request)
1549         return d
1550
1551     def _remove_object(self, path, must_be_directory=False, must_be_file=False):
1552         userpath = self._path_to_utf8(path)
1553         d = self._get_parent_or_node(path)
1554         def _got_parent( (parent, childname) ):
1555             if childname is None:
1556                 raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
1557
1558             direntry = _direntry_for(parent, childname)
1559             d2 = defer.succeed(False)
1560             if not must_be_directory:
1561                 d2.addCallback(lambda ign: self._abandon_any_heisenfiles(userpath, direntry))
1562
1563             d2.addCallback(lambda abandoned:
1564                            parent.delete(childname, must_exist=not abandoned,
1565                                          must_be_directory=must_be_directory, must_be_file=must_be_file))
1566             return d2
1567         d.addCallback(_got_parent)
1568         return d
1569
1570     def openDirectory(self, pathstring):
1571         request = ".openDirectory(%r)" % (pathstring,)
1572         self.log(request, level=OPERATIONAL)
1573
1574         path = self._path_from_string(pathstring)
1575         d = self._get_parent_or_node(path)
1576         def _got_parent_or_node( (parent_or_node, childname) ):
1577             if noisy: self.log("_got_parent_or_node( (%r, %r) ) in openDirectory(%r)" %
1578                                (parent_or_node, childname, pathstring), level=NOISY)
1579             if childname is None:
1580                 return parent_or_node
1581             else:
1582                 return parent_or_node.get(childname)
1583         d.addCallback(_got_parent_or_node)
1584         def _list(dirnode):
1585             if dirnode.is_unknown():
1586                 raise SFTPError(FX_PERMISSION_DENIED,
1587                                 "cannot list an unknown cap as a directory. Upgrading the gateway "
1588                                 "to a later Tahoe-LAFS version may help")
1589             if not IDirectoryNode.providedBy(dirnode):
1590                 raise SFTPError(FX_PERMISSION_DENIED,
1591                                 "cannot list a file as if it were a directory")
1592
1593             d2 = dirnode.list()
1594             def _render(children):
1595                 parent_readonly = dirnode.is_readonly()
1596                 results = []
1597                 for filename, (child, metadata) in children.iteritems():
1598                     # The file size may be cached or absent.
1599                     metadata['no-write'] = _no_write(parent_readonly, child, metadata)
1600                     attrs = _populate_attrs(child, metadata)
1601                     filename_utf8 = filename.encode('utf-8')
1602                     longname = _lsLine(filename_utf8, attrs)
1603                     results.append( (filename_utf8, longname, attrs) )
1604                 return StoppableList(results)
1605             d2.addCallback(_render)
1606             return d2
1607         d.addCallback(_list)
1608         d.addBoth(_convert_error, request)
1609         return d
1610
1611     def getAttrs(self, pathstring, followLinks):
1612         request = ".getAttrs(%r, followLinks=%r)" % (pathstring, followLinks)
1613         self.log(request, level=OPERATIONAL)
1614
1615         # When asked about a specific file, report its current size.
1616         # TODO: the modification time for a mutable file should be
1617         # reported as the update time of the best version. But that
1618         # information isn't currently stored in mutable shares, I think.
1619
1620         path = self._path_from_string(pathstring)
1621         userpath = self._path_to_utf8(path)
1622         d = self._get_parent_or_node(path)
1623         def _got_parent_or_node( (parent_or_node, childname) ):
1624             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1625
1626             # Some clients will incorrectly try to get the attributes
1627             # of a file immediately after opening it, before it has been put
1628             # into the all_heisenfiles table. This is a race condition bug in
1629             # the client, but we handle it anyway by calling .sync() on all
1630             # files matching either the path or the direntry.
1631
1632             direntry = _direntry_for(parent_or_node, childname)
1633             d2 = self._sync_heisenfiles(userpath, direntry)
1634
1635             if childname is None:
1636                 node = parent_or_node
1637                 d2.addCallback(lambda ign: node.get_current_size())
1638                 d2.addCallback(lambda size:
1639                                _populate_attrs(node, {'no-write': node.is_unknown() or node.is_readonly()}, size=size))
1640             else:
1641                 parent = parent_or_node
1642                 d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname]))
1643                 def _got( (child, metadata) ):
1644                     if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
1645                     assert IDirectoryNode.providedBy(parent), parent
1646                     metadata['no-write'] = _no_write(parent.is_readonly(), child, metadata)
1647                     d3 = child.get_current_size()
1648                     d3.addCallback(lambda size: _populate_attrs(child, metadata, size=size))
1649                     return d3
1650                 def _nosuch(err):
1651                     if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
1652                     err.trap(NoSuchChildError)
1653                     if noisy: self.log("checking open files:\nself._heisenfiles = %r\nall_heisenfiles = %r\ndirentry=%r" %
1654                                        (self._heisenfiles, all_heisenfiles, direntry), level=NOISY)
1655                     if direntry in all_heisenfiles:
1656                         files = all_heisenfiles[direntry]
1657                         if len(files) == 0:  # pragma: no cover
1658                             return err
1659                         # use the heisenfile that was most recently opened
1660                         return files[-1].getAttrs()
1661                     return err
1662                 d2.addCallbacks(_got, _nosuch)
1663             return d2
1664         d.addCallback(_got_parent_or_node)
1665         d.addBoth(_convert_error, request)
1666         return d
1667
1668     def setAttrs(self, pathstring, attrs):
1669         request = ".setAttrs(%r, %r)" % (pathstring, attrs)
1670         self.log(request, level=OPERATIONAL)
1671
1672         if "size" in attrs:
1673             # this would require us to download and re-upload the truncated/extended
1674             # file contents
1675             def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute unsupported")
1676             return defer.execute(_unsupported)
1677
1678         path = self._path_from_string(pathstring)
1679         userpath = self._path_to_utf8(path)
1680         d = self._get_parent_or_node(path)
1681         def _got_parent_or_node( (parent_or_node, childname) ):
1682             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1683
1684             direntry = _direntry_for(parent_or_node, childname)
1685             d2 = self._update_attrs_for_heisenfiles(userpath, direntry, attrs)
1686
1687             def _update(updated_heisenfiles):
1688                 if childname is None:
1689                     if updated_heisenfiles:
1690                         return None
1691                     raise SFTPError(FX_NO_SUCH_FILE, userpath)
1692                 else:
1693                     desired_metadata = _attrs_to_metadata(attrs)
1694                     if noisy: self.log("desired_metadata = %r" % (desired_metadata,), level=NOISY)
1695
1696                     d3 = parent_or_node.set_metadata_for(childname, desired_metadata)
1697                     def _nosuch(err):
1698                         if updated_heisenfiles:
1699                             err.trap(NoSuchChildError)
1700                         else:
1701                             return err
1702                     d3.addErrback(_nosuch)
1703                     return d3
1704             d2.addCallback(_update)
1705             d2.addCallback(lambda ign: None)
1706             return d2
1707         d.addCallback(_got_parent_or_node)
1708         d.addBoth(_convert_error, request)
1709         return d
1710
1711     def readLink(self, pathstring):
1712         self.log(".readLink(%r)" % (pathstring,), level=OPERATIONAL)
1713
1714         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "readLink")
1715         return defer.execute(_unsupported)
1716
1717     def makeLink(self, linkPathstring, targetPathstring):
1718         self.log(".makeLink(%r, %r)" % (linkPathstring, targetPathstring), level=OPERATIONAL)
1719
1720         # If this is implemented, note the reversal of arguments described in point 7 of
1721         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>.
1722
1723         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "makeLink")
1724         return defer.execute(_unsupported)
1725
1726     def extendedRequest(self, extensionName, extensionData):
1727         self.log(".extendedRequest(%r, <data of length %r>)" % (extensionName, len(extensionData)), level=OPERATIONAL)
1728
1729         # We implement the three main OpenSSH SFTP extensions; see
1730         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1731
1732         if extensionName == 'posix-rename@openssh.com':
1733             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "could not parse posix-rename@openssh.com request")
1734
1735             if 4 > len(extensionData): return defer.execute(_bad)
1736             (fromPathLen,) = struct.unpack('>L', extensionData[0:4])
1737             if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
1738
1739             (toPathLen,) = struct.unpack('>L', extensionData[(4 + fromPathLen):(8 + fromPathLen)])
1740             if 8 + fromPathLen + toPathLen != len(extensionData): return defer.execute(_bad)
1741
1742             fromPathstring = extensionData[4:(4 + fromPathLen)]
1743             toPathstring = extensionData[(8 + fromPathLen):]
1744             d = self.renameFile(fromPathstring, toPathstring, overwrite=True)
1745
1746             # Twisted conch assumes that the response from an extended request is either
1747             # an error, or an FXP_EXTENDED_REPLY. But it happens to do the right thing
1748             # (respond with an FXP_STATUS message) if we return a Failure with code FX_OK.
1749             def _succeeded(ign):
1750                 raise SFTPError(FX_OK, "request succeeded")
1751             d.addCallback(_succeeded)
1752             return d
1753
1754         if extensionName == 'statvfs@openssh.com' or extensionName == 'fstatvfs@openssh.com':
1755             # f_bsize and f_frsize should be the same to avoid a bug in 'df'
1756             return defer.succeed(struct.pack('>11Q',
1757                 1024,         # uint64  f_bsize     /* file system block size */
1758                 1024,         # uint64  f_frsize    /* fundamental fs block size */
1759                 628318530,    # uint64  f_blocks    /* number of blocks (unit f_frsize) */
1760                 314159265,    # uint64  f_bfree     /* free blocks in file system */
1761                 314159265,    # uint64  f_bavail    /* free blocks for non-root */
1762                 200000000,    # uint64  f_files     /* total file inodes */
1763                 100000000,    # uint64  f_ffree     /* free file inodes */
1764                 100000000,    # uint64  f_favail    /* free file inodes for non-root */
1765                 0x1AF5,       # uint64  f_fsid      /* file system id */
1766                 2,            # uint64  f_flag      /* bit mask = ST_NOSUID; not ST_RDONLY */
1767                 65535,        # uint64  f_namemax   /* maximum filename length */
1768                 ))
1769
1770         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "unsupported %r request <data of length %r>" %
1771                                                                (extensionName, len(extensionData)))
1772         return defer.execute(_unsupported)
1773
1774     def realPath(self, pathstring):
1775         self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
1776
1777         return self._path_to_utf8(self._path_from_string(pathstring))
1778
1779     def _path_to_utf8(self, path):
1780         return (u"/" + u"/".join(path)).encode('utf-8')
1781
1782     def _path_from_string(self, pathstring):
1783         if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
1784
1785         assert isinstance(pathstring, str), pathstring
1786
1787         # The home directory is the root directory.
1788         pathstring = pathstring.strip("/")
1789         if pathstring == "" or pathstring == ".":
1790             path_utf8 = []
1791         else:
1792             path_utf8 = pathstring.split("/")
1793
1794         # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.2>
1795         # "Servers SHOULD interpret a path name component ".." as referring to
1796         #  the parent directory, and "." as referring to the current directory."
1797         path = []
1798         for p_utf8 in path_utf8:
1799             if p_utf8 == "..":
1800                 # ignore excess .. components at the root
1801                 if len(path) > 0:
1802                     path = path[:-1]
1803             elif p_utf8 != ".":
1804                 try:
1805                     p = p_utf8.decode('utf-8', 'strict')
1806                 except UnicodeError:
1807                     raise SFTPError(FX_NO_SUCH_FILE, "path could not be decoded as UTF-8")
1808                 path.append(p)
1809
1810         if noisy: self.log(" PATH %r" % (path,), level=NOISY)
1811         return path
1812
1813     def _get_root(self, path):
1814         # return Deferred (root, remaining_path)
1815         d = defer.succeed(None)
1816         if path and path[0] == u"uri":
1817             d.addCallback(lambda ign: self._client.create_node_from_uri(path[1].encode('utf-8')))
1818             d.addCallback(lambda root: (root, path[2:]))
1819         else:
1820             d.addCallback(lambda ign: (self._root, path))
1821         return d
1822
1823     def _get_parent_or_node(self, path):
1824         # return Deferred (parent, childname) or (node, None)
1825         d = self._get_root(path)
1826         def _got_root( (root, remaining_path) ):
1827             if not remaining_path:
1828                 return (root, None)
1829             else:
1830                 d2 = root.get_child_at_path(remaining_path[:-1])
1831                 d2.addCallback(lambda parent: (parent, remaining_path[-1]))
1832                 return d2
1833         d.addCallback(_got_root)
1834         return d
1835
1836
1837 class FakeTransport:
1838     implements(ITransport)
1839     def write(self, data):
1840         logmsg("FakeTransport.write(<data of length %r>)" % (len(data),), level=NOISY)
1841
1842     def writeSequence(self, data):
1843         logmsg("FakeTransport.writeSequence(...)", level=NOISY)
1844
1845     def loseConnection(self):
1846         logmsg("FakeTransport.loseConnection()", level=NOISY)
1847
1848     # getPeer and getHost can just raise errors, since we don't know what to return
1849
1850
1851 class ShellSession(PrefixingLogMixin):
1852     implements(ISession)
1853     def __init__(self, userHandler):
1854         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
1855         if noisy: self.log(".__init__(%r)" % (userHandler), level=NOISY)
1856
1857     def getPty(self, terminal, windowSize, attrs):
1858         self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
1859
1860     def openShell(self, protocol):
1861         self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
1862         if hasattr(protocol, 'transport') and protocol.transport is None:
1863             protocol.transport = FakeTransport()  # work around Twisted bug
1864
1865         return self._unsupported(protocol)
1866
1867     def execCommand(self, protocol, cmd):
1868         self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
1869         if hasattr(protocol, 'transport') and protocol.transport is None:
1870             protocol.transport = FakeTransport()  # work around Twisted bug
1871
1872         d = defer.succeed(None)
1873         if cmd == "df -P -k /":
1874             d.addCallback(lambda ign: protocol.write(
1875                           "Filesystem         1024-blocks      Used Available Capacity Mounted on\r\n"
1876                           "tahoe                628318530 314159265 314159265      50% /\r\n"))
1877             d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessDone(None))))
1878         else:
1879             d.addCallback(lambda ign: self._unsupported(protocol))
1880         return d
1881
1882     def _unsupported(self, protocol):
1883         d = defer.succeed(None)
1884         d.addCallback(lambda ign: protocol.errReceived(
1885                       "This server supports only the SFTP protocol. It does not support SCP,\r\n"
1886                       "interactive shell sessions, or commands other than one needed by sshfs.\r\n"))
1887         d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
1888         return d
1889
1890     def windowChanged(self, newWindowSize):
1891         self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
1892
1893     def eofReceived(self):
1894         self.log(".eofReceived()", level=OPERATIONAL)
1895
1896     def closed(self):
1897         self.log(".closed()", level=OPERATIONAL)
1898
1899
1900 # If you have an SFTPUserHandler and want something that provides ISession, you get
1901 # ShellSession(userHandler).
1902 # We use adaptation because this must be a different object to the SFTPUserHandler.
1903 components.registerAdapter(ShellSession, SFTPUserHandler, ISession)
1904
1905
1906 from allmydata.frontends.auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
1907
1908 class Dispatcher:
1909     implements(portal.IRealm)
1910     def __init__(self, client):
1911         self._client = client
1912
1913     def requestAvatar(self, avatarID, mind, interface):
1914         assert interface == IConchUser, interface
1915         rootnode = self._client.create_node_from_uri(avatarID.rootcap)
1916         handler = SFTPUserHandler(self._client, rootnode, avatarID.username)
1917         return (interface, handler, handler.logout)
1918
1919
1920 class SFTPServer(service.MultiService):
1921     def __init__(self, client, accountfile, accounturl,
1922                  sftp_portstr, pubkey_file, privkey_file):
1923         service.MultiService.__init__(self)
1924
1925         r = Dispatcher(client)
1926         p = portal.Portal(r)
1927
1928         if accountfile:
1929             c = AccountFileChecker(self, accountfile)
1930             p.registerChecker(c)
1931         if accounturl:
1932             c = AccountURLChecker(self, accounturl)
1933             p.registerChecker(c)
1934         if not accountfile and not accounturl:
1935             # we could leave this anonymous, with just the /uri/CAP form
1936             raise NeedRootcapLookupScheme("must provide an account file or URL")
1937
1938         pubkey = keys.Key.fromFile(pubkey_file)
1939         privkey = keys.Key.fromFile(privkey_file)
1940         class SSHFactory(factory.SSHFactory):
1941             publicKeys = {pubkey.sshType(): pubkey}
1942             privateKeys = {privkey.sshType(): privkey}
1943             def getPrimes(self):
1944                 try:
1945                     # if present, this enables diffie-hellman-group-exchange
1946                     return primes.parseModuliFile("/etc/ssh/moduli")
1947                 except IOError:
1948                     return None
1949
1950         f = SSHFactory()
1951         f.portal = p
1952
1953         s = strports.service(sftp_portstr, f)
1954         s.setServiceParent(self)