]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/sftpd.py
4e16d6ab344bc3beba2fa47048d7db00c1a3dbcb
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / sftpd.py
1
2 import heapq, traceback, array, stat, struct
3 from types import NoneType
4 from stat import S_IFREG, S_IFDIR
5 from time import time, strftime, localtime
6
7 from zope.interface import implements
8 from twisted.python import components
9 from twisted.application import service, strports
10 from twisted.conch.ssh import factory, keys, session
11 from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
12      FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
13      FX_BAD_MESSAGE, FX_FAILURE, FX_OK
14 from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
15      FXF_CREAT, FXF_TRUNC, FXF_EXCL
16 from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser, ISession
17 from twisted.conch.avatar import ConchUser
18 from twisted.conch.openssh_compat import primes
19 from twisted.cred import portal
20 from twisted.internet.error import ProcessDone, ProcessTerminated
21 from twisted.python.failure import Failure
22 from twisted.internet.interfaces import ITransport
23
24 from twisted.internet import defer
25 from twisted.internet.interfaces import IFinishableConsumer
26 from foolscap.api import eventually
27 from allmydata.util import deferredutil
28
29 from allmydata.util.assertutil import _assert, precondition
30 from allmydata.util.consumer import download_to_data
31 from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
32      NoSuchChildError, ChildOfWrongTypeError
33 from allmydata.mutable.common import NotWriteableError
34 from allmydata.mutable.publish import MutableFileHandle
35 from allmydata.immutable.upload import FileHandle
36 from allmydata.dirnode import update_metadata
37 from allmydata.util.fileutil import EncryptedTemporaryFile
38
39 noisy = True
40 use_foolscap_logging = True
41
42 from allmydata.util.log import NOISY, OPERATIONAL, WEIRD, \
43     msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
44
45 if use_foolscap_logging:
46     (logmsg, logerr, PrefixingLogMixin) = (_msg, _err, _PrefixingLogMixin)
47 else:  # pragma: no cover
48     def logmsg(s, level=None):
49         print s
50     def logerr(s, level=None):
51         print s
52     class PrefixingLogMixin:
53         def __init__(self, facility=None, prefix=''):
54             self.prefix = prefix
55         def log(self, s, level=None):
56             print "%r %s" % (self.prefix, s)
57
58
59 def eventually_callback(d):
60     return lambda res: eventually(d.callback, res)
61
62 def eventually_errback(d):
63     return lambda err: eventually(d.errback, err)
64
65
66 def _utf8(x):
67     if isinstance(x, unicode):
68         return x.encode('utf-8')
69     if isinstance(x, str):
70         return x
71     return repr(x)
72
73
74 def _to_sftp_time(t):
75     """SFTP times are unsigned 32-bit integers representing UTC seconds
76     (ignoring leap seconds) since the Unix epoch, January 1 1970 00:00 UTC.
77     A Tahoe time is the corresponding float."""
78     return long(t) & 0xFFFFFFFFL
79
80
81 def _convert_error(res, request):
82     """If res is not a Failure, return it, otherwise reraise the appropriate
83     SFTPError."""
84
85     if not isinstance(res, Failure):
86         logged_res = res
87         if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
88         logmsg("SUCCESS %r %r" % (request, logged_res,), level=OPERATIONAL)
89         return res
90
91     err = res
92     logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
93     try:
94         if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
95     except Exception:  # pragma: no cover
96         pass
97
98     # The message argument to SFTPError must not reveal information that
99     # might compromise anonymity, if we are running over an anonymous network.
100
101     if err.check(SFTPError):
102         # original raiser of SFTPError has responsibility to ensure anonymity
103         raise err
104     if err.check(NoSuchChildError):
105         childname = _utf8(err.value.args[0])
106         raise SFTPError(FX_NO_SUCH_FILE, childname)
107     if err.check(NotWriteableError) or err.check(ChildOfWrongTypeError):
108         msg = _utf8(err.value.args[0])
109         raise SFTPError(FX_PERMISSION_DENIED, msg)
110     if err.check(ExistingChildError):
111         # Versions of SFTP after v3 (which is what twisted.conch implements)
112         # define a specific error code for this case: FX_FILE_ALREADY_EXISTS.
113         # However v3 doesn't; instead, other servers such as sshd return
114         # FX_FAILURE. The gvfs SFTP backend, for example, depends on this
115         # to translate the error to the equivalent of POSIX EEXIST, which is
116         # necessary for some picky programs (such as gedit).
117         msg = _utf8(err.value.args[0])
118         raise SFTPError(FX_FAILURE, msg)
119     if err.check(NotImplementedError):
120         raise SFTPError(FX_OP_UNSUPPORTED, _utf8(err.value))
121     if err.check(EOFError):
122         raise SFTPError(FX_EOF, "end of file reached")
123     if err.check(defer.FirstError):
124         _convert_error(err.value.subFailure, request)
125
126     # We assume that the error message is not anonymity-sensitive.
127     raise SFTPError(FX_FAILURE, _utf8(err.value))
128
129
130 def _repr_flags(flags):
131     return "|".join([f for f in
132                      [(flags & FXF_READ)   and "FXF_READ"   or None,
133                       (flags & FXF_WRITE)  and "FXF_WRITE"  or None,
134                       (flags & FXF_APPEND) and "FXF_APPEND" or None,
135                       (flags & FXF_CREAT)  and "FXF_CREAT"  or None,
136                       (flags & FXF_TRUNC)  and "FXF_TRUNC"  or None,
137                       (flags & FXF_EXCL)   and "FXF_EXCL"   or None,
138                      ]
139                      if f])
140
141
142 def _lsLine(name, attrs):
143     st_uid = "tahoe"
144     st_gid = "tahoe"
145     st_mtime = attrs.get("mtime", 0)
146     st_mode = attrs["permissions"]
147
148     # Some clients won't tolerate '?' in the size field (#1337).
149     st_size = attrs.get("size", 0)
150
151     # We don't know how many links there really are to this object.
152     st_nlink = 1
153
154     # Based on <https://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
155     # We previously could not call the version in Twisted because we needed the change
156     # <https://twistedmatrix.com/trac/changeset/25412> (released in Twisted v8.2).
157     # Since we now depend on Twisted v10.1, consider calling Twisted's version.
158
159     mode = st_mode
160     perms = array.array('c', '-'*10)
161     ft = stat.S_IFMT(mode)
162     if   stat.S_ISDIR(ft): perms[0] = 'd'
163     elif stat.S_ISREG(ft): perms[0] = '-'
164     else: perms[0] = '?'
165     # user
166     if mode&stat.S_IRUSR: perms[1] = 'r'
167     if mode&stat.S_IWUSR: perms[2] = 'w'
168     if mode&stat.S_IXUSR: perms[3] = 'x'
169     # group
170     if mode&stat.S_IRGRP: perms[4] = 'r'
171     if mode&stat.S_IWGRP: perms[5] = 'w'
172     if mode&stat.S_IXGRP: perms[6] = 'x'
173     # other
174     if mode&stat.S_IROTH: perms[7] = 'r'
175     if mode&stat.S_IWOTH: perms[8] = 'w'
176     if mode&stat.S_IXOTH: perms[9] = 'x'
177     # suid/sgid never set
178
179     l = perms.tostring()
180     l += str(st_nlink).rjust(5) + ' '
181     un = str(st_uid)
182     l += un.ljust(9)
183     gr = str(st_gid)
184     l += gr.ljust(9)
185     sz = str(st_size)
186     l += sz.rjust(8)
187     l += ' '
188     day = 60 * 60 * 24
189     sixmo = day * 7 * 26
190     now = time()
191     if st_mtime + sixmo < now or st_mtime > now + day:
192         # mtime is more than 6 months ago, or more than one day in the future
193         l += strftime("%b %d  %Y ", localtime(st_mtime))
194     else:
195         l += strftime("%b %d %H:%M ", localtime(st_mtime))
196     l += name
197     return l
198
199
200 def _no_write(parent_readonly, child, metadata=None):
201     """Whether child should be listed as having read-only permissions in parent."""
202
203     if child.is_unknown():
204         return True
205     elif child.is_mutable():
206         return child.is_readonly()
207     elif parent_readonly or IDirectoryNode.providedBy(child):
208         return True
209     else:
210         return metadata is not None and metadata.get('no-write', False)
211
212
213 def _populate_attrs(childnode, metadata, size=None):
214     attrs = {}
215
216     # The permissions must have the S_IFDIR (040000) or S_IFREG (0100000)
217     # bits, otherwise the client may refuse to open a directory.
218     # Also, sshfs run as a non-root user requires files and directories
219     # to be world-readable/writeable.
220     # It is important that we never set the executable bits on files.
221     #
222     # Directories and unknown nodes have no size, and SFTP doesn't
223     # require us to make one up.
224     #
225     # childnode might be None, meaning that the file doesn't exist yet,
226     # but we're going to write it later.
227
228     if childnode and childnode.is_unknown():
229         perms = 0
230     elif childnode and IDirectoryNode.providedBy(childnode):
231         perms = S_IFDIR | 0777
232     else:
233         # For files, omit the size if we don't immediately know it.
234         if childnode and size is None:
235             size = childnode.get_size()
236         if size is not None:
237             _assert(isinstance(size, (int, long)) and not isinstance(size, bool), size=size)
238             attrs['size'] = size
239         perms = S_IFREG | 0666
240
241     if metadata:
242         if metadata.get('no-write', False):
243             perms &= S_IFDIR | S_IFREG | 0555  # clear 'w' bits
244
245         # See webapi.txt for what these times mean.
246         # We would prefer to omit atime, but SFTP version 3 can only
247         # accept mtime if atime is also set.
248         if 'linkmotime' in metadata.get('tahoe', {}):
249             attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
250         elif 'mtime' in metadata:
251             attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['mtime'])
252
253         if 'linkcrtime' in metadata.get('tahoe', {}):
254             attrs['createtime'] = _to_sftp_time(metadata['tahoe']['linkcrtime'])
255
256     attrs['permissions'] = perms
257
258     # twisted.conch.ssh.filetransfer only implements SFTP version 3,
259     # which doesn't include SSH_FILEXFER_ATTR_FLAGS.
260
261     return attrs
262
263
264 def _attrs_to_metadata(attrs):
265     metadata = {}
266
267     for key in attrs:
268         if key == "mtime" or key == "ctime" or key == "createtime":
269             metadata[key] = long(attrs[key])
270         elif key.startswith("ext_"):
271             metadata[key] = str(attrs[key])
272
273     perms = attrs.get('permissions', stat.S_IWUSR)
274     if not (perms & stat.S_IWUSR):
275         metadata['no-write'] = True
276
277     return metadata
278
279
280 def _direntry_for(filenode_or_parent, childname, filenode=None):
281     precondition(isinstance(childname, (unicode, NoneType)), childname=childname)
282
283     if childname is None:
284         filenode_or_parent = filenode
285
286     if filenode_or_parent:
287         rw_uri = filenode_or_parent.get_write_uri()
288         if rw_uri and childname:
289             return rw_uri + "/" + childname.encode('utf-8')
290         else:
291             return rw_uri
292
293     return None
294
295
296 class OverwriteableFileConsumer(PrefixingLogMixin):
297     implements(IFinishableConsumer)
298     """I act both as a consumer for the download of the original file contents, and as a
299     wrapper for a temporary file that records the downloaded data and any overwrites.
300     I use a priority queue to keep track of which regions of the file have been overwritten
301     but not yet downloaded, so that the download does not clobber overwritten data.
302     I use another priority queue to record milestones at which to make callbacks
303     indicating that a given number of bytes have been downloaded.
304
305     The temporary file reflects the contents of the file that I represent, except that:
306      - regions that have neither been downloaded nor overwritten, if present,
307        contain garbage.
308      - the temporary file may be shorter than the represented file (it is never longer).
309        The latter's current size is stored in self.current_size.
310
311     This abstraction is mostly independent of SFTP. Consider moving it, if it is found
312     useful for other frontends."""
313
314     def __init__(self, download_size, tempfile_maker):
315         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
316         if noisy: self.log(".__init__(%r, %r)" % (download_size, tempfile_maker), level=NOISY)
317         self.download_size = download_size
318         self.current_size = download_size
319         self.f = tempfile_maker()
320         self.downloaded = 0
321         self.milestones = []  # empty heap of (offset, d)
322         self.overwrites = []  # empty heap of (start, end)
323         self.is_closed = False
324         self.done = self.when_reached(download_size)  # adds a milestone
325         self.is_done = False
326         def _signal_done(ign):
327             if noisy: self.log("DONE", level=NOISY)
328             self.is_done = True
329         self.done.addCallback(_signal_done)
330         self.producer = None
331
332     def get_file(self):
333         return self.f
334
335     def get_current_size(self):
336         return self.current_size
337
338     def set_current_size(self, size):
339         if noisy: self.log(".set_current_size(%r), current_size = %r, downloaded = %r" %
340                            (size, self.current_size, self.downloaded), level=NOISY)
341         if size < self.current_size or size < self.downloaded:
342             self.f.truncate(size)
343         if size > self.current_size:
344             self.overwrite(self.current_size, "\x00" * (size - self.current_size))
345         self.current_size = size
346
347         # make the invariant self.download_size <= self.current_size be true again
348         if size < self.download_size:
349             self.download_size = size
350
351         if self.downloaded >= self.download_size:
352             self.finish()
353
354     def registerProducer(self, p, streaming):
355         if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
356         if self.producer is not None:
357             raise RuntimeError("producer is already registered")
358
359         self.producer = p
360         if streaming:
361             # call resumeProducing once to start things off
362             p.resumeProducing()
363         else:
364             def _iterate():
365                 if not self.is_done:
366                     p.resumeProducing()
367                     eventually(_iterate)
368             _iterate()
369
370     def write(self, data):
371         if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
372         if self.is_closed:
373             return
374
375         if self.downloaded >= self.download_size:
376             return
377
378         next_downloaded = self.downloaded + len(data)
379         if next_downloaded > self.download_size:
380             data = data[:(self.download_size - self.downloaded)]
381
382         while len(self.overwrites) > 0:
383             (start, end) = self.overwrites[0]
384             if start >= next_downloaded:
385                 # This and all remaining overwrites are after the data we just downloaded.
386                 break
387             if start > self.downloaded:
388                 # The data we just downloaded has been partially overwritten.
389                 # Write the prefix of it that precedes the overwritten region.
390                 self.f.seek(self.downloaded)
391                 self.f.write(data[:(start - self.downloaded)])
392
393             # This merges consecutive overwrites if possible, which allows us to detect the
394             # case where the download can be stopped early because the remaining region
395             # to download has already been fully overwritten.
396             heapq.heappop(self.overwrites)
397             while len(self.overwrites) > 0:
398                 (start1, end1) = self.overwrites[0]
399                 if start1 > end:
400                     break
401                 end = end1
402                 heapq.heappop(self.overwrites)
403
404             if end >= next_downloaded:
405                 # This overwrite extends past the downloaded data, so there is no
406                 # more data to consider on this call.
407                 heapq.heappush(self.overwrites, (next_downloaded, end))
408                 self._update_downloaded(next_downloaded)
409                 return
410             elif end >= self.downloaded:
411                 data = data[(end - self.downloaded):]
412                 self._update_downloaded(end)
413
414         self.f.seek(self.downloaded)
415         self.f.write(data)
416         self._update_downloaded(next_downloaded)
417
418     def _update_downloaded(self, new_downloaded):
419         self.downloaded = new_downloaded
420         milestone = new_downloaded
421         if len(self.overwrites) > 0:
422             (start, end) = self.overwrites[0]
423             if start <= new_downloaded and end > milestone:
424                 milestone = end
425
426         while len(self.milestones) > 0:
427             (next, d) = self.milestones[0]
428             if next > milestone:
429                 return
430             if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
431             heapq.heappop(self.milestones)
432             eventually(d.callback, None)
433
434         if milestone >= self.download_size:
435             self.finish()
436
437     def overwrite(self, offset, data):
438         if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
439         if offset > self.current_size:
440             # Normally writing at an offset beyond the current end-of-file
441             # would leave a hole that appears filled with zeroes. However, an
442             # EncryptedTemporaryFile doesn't behave like that (if there is a
443             # hole in the file on disk, the zeroes that are read back will be
444             # XORed with the keystream). So we must explicitly write zeroes in
445             # the gap between the current EOF and the offset.
446
447             self.f.seek(self.current_size)
448             self.f.write("\x00" * (offset - self.current_size))
449             start = self.current_size
450         else:
451             self.f.seek(offset)
452             start = offset
453
454         self.f.write(data)
455         end = offset + len(data)
456         self.current_size = max(self.current_size, end)
457         if end > self.downloaded:
458             heapq.heappush(self.overwrites, (start, end))
459
460     def read(self, offset, length):
461         """When the data has been read, callback the Deferred that we return with this data.
462         Otherwise errback the Deferred that we return.
463         The caller must perform no more overwrites until the Deferred has fired."""
464
465         if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
466
467         # Note that the overwrite method is synchronous. When a write request is processed
468         # (e.g. a writeChunk request on the async queue of GeneralSFTPFile), overwrite will
469         # be called and will update self.current_size if necessary before returning. Therefore,
470         # self.current_size will be up-to-date for a subsequent call to this read method, and
471         # so it is correct to do the check for a read past the end-of-file here.
472         if offset >= self.current_size:
473             def _eof(): raise EOFError("read past end of file")
474             return defer.execute(_eof)
475
476         if offset + length > self.current_size:
477             length = self.current_size - offset
478             if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
479
480         needed = min(offset + length, self.download_size)
481         d = self.when_reached(needed)
482         def _reached(ign):
483             # It is not necessarily the case that self.downloaded >= needed, because
484             # the file might have been truncated (thus truncating the download) and
485             # then extended.
486
487             _assert(self.current_size >= offset + length,
488                     current_size=self.current_size, offset=offset, length=length)
489             if noisy: self.log("self.f = %r" % (self.f,), level=NOISY)
490             self.f.seek(offset)
491             return self.f.read(length)
492         d.addCallback(_reached)
493         return d
494
495     def when_reached(self, index):
496         if noisy: self.log(".when_reached(%r)" % (index,), level=NOISY)
497         if index <= self.downloaded:  # already reached
498             if noisy: self.log("already reached %r" % (index,), level=NOISY)
499             return defer.succeed(None)
500         d = defer.Deferred()
501         def _reached(ign):
502             if noisy: self.log("reached %r" % (index,), level=NOISY)
503             return ign
504         d.addCallback(_reached)
505         heapq.heappush(self.milestones, (index, d))
506         return d
507
508     def when_done(self):
509         return self.done
510
511     def finish(self):
512         """Called by the producer when it has finished producing, or when we have
513         received enough bytes, or as a result of a close. Defined by IFinishableConsumer."""
514
515         while len(self.milestones) > 0:
516             (next, d) = self.milestones[0]
517             if noisy: self.log("MILESTONE FINISH %r %r" % (next, d), level=NOISY)
518             heapq.heappop(self.milestones)
519             # The callback means that the milestone has been reached if
520             # it is ever going to be. Note that the file may have been
521             # truncated to before the milestone.
522             eventually(d.callback, None)
523
524     def close(self):
525         if not self.is_closed:
526             self.is_closed = True
527             try:
528                 self.f.close()
529             except Exception, e:
530                 self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
531         self.finish()
532
533     def unregisterProducer(self):
534         pass
535
536
537 SIZE_THRESHOLD = 1000
538
539
540 class ShortReadOnlySFTPFile(PrefixingLogMixin):
541     implements(ISFTPFile)
542     """I represent a file handle to a particular file on an SFTP connection.
543     I am used only for short immutable files opened in read-only mode.
544     When I am created, the file contents start to be downloaded to memory.
545     self.async is used to delay read requests until the download has finished."""
546
547     def __init__(self, userpath, filenode, metadata):
548         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
549         if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
550
551         precondition(isinstance(userpath, str) and IFileNode.providedBy(filenode),
552                      userpath=userpath, filenode=filenode)
553         self.filenode = filenode
554         self.metadata = metadata
555         self.async = download_to_data(filenode)
556         self.closed = False
557
558     def readChunk(self, offset, length):
559         request = ".readChunk(%r, %r)" % (offset, length)
560         self.log(request, level=OPERATIONAL)
561
562         if self.closed:
563             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
564             return defer.execute(_closed)
565
566         d = defer.Deferred()
567         def _read(data):
568             if noisy: self.log("_read(<data of length %r>) in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY)
569
570             # "In response to this request, the server will read as many bytes as it
571             #  can from the file (up to 'len'), and return them in a SSH_FXP_DATA
572             #  message.  If an error occurs or EOF is encountered before reading any
573             #  data, the server will respond with SSH_FXP_STATUS.  For normal disk
574             #  files, it is guaranteed that this will read the specified number of
575             #  bytes, or up to end of file."
576             #
577             # i.e. we respond with an EOF error iff offset is already at EOF.
578
579             if offset >= len(data):
580                 eventually(d.errback, SFTPError(FX_EOF, "read at or past end of file"))
581             else:
582                 eventually(d.callback, data[offset:offset+length])  # truncated if offset+length > len(data)
583             return data
584         self.async.addCallbacks(_read, eventually_errback(d))
585         d.addBoth(_convert_error, request)
586         return d
587
588     def writeChunk(self, offset, data):
589         self.log(".writeChunk(%r, <data of length %r>) denied" % (offset, len(data)), level=OPERATIONAL)
590
591         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
592         return defer.execute(_denied)
593
594     def close(self):
595         self.log(".close()", level=OPERATIONAL)
596
597         self.closed = True
598         return defer.succeed(None)
599
600     def getAttrs(self):
601         request = ".getAttrs()"
602         self.log(request, level=OPERATIONAL)
603
604         if self.closed:
605             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
606             return defer.execute(_closed)
607
608         d = defer.execute(_populate_attrs, self.filenode, self.metadata)
609         d.addBoth(_convert_error, request)
610         return d
611
612     def setAttrs(self, attrs):
613         self.log(".setAttrs(%r) denied" % (attrs,), level=OPERATIONAL)
614         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
615         return defer.execute(_denied)
616
617
618 class GeneralSFTPFile(PrefixingLogMixin):
619     implements(ISFTPFile)
620     """I represent a file handle to a particular file on an SFTP connection.
621     I wrap an instance of OverwriteableFileConsumer, which is responsible for
622     storing the file contents. In order to allow write requests to be satisfied
623     immediately, there is effectively a FIFO queue between requests made to this
624     file handle, and requests to my OverwriteableFileConsumer. This queue is
625     implemented by the callback chain of self.async.
626
627     When first constructed, I am in an 'unopened' state that causes most
628     operations to be delayed until 'open' is called."""
629
630     def __init__(self, userpath, flags, close_notify, convergence):
631         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
632         if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
633                            (userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
634
635         precondition(isinstance(userpath, str), userpath=userpath)
636         self.userpath = userpath
637         self.flags = flags
638         self.close_notify = close_notify
639         self.convergence = convergence
640         self.async = defer.Deferred()
641         # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
642         self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
643         self.closed = False
644         self.abandoned = False
645         self.parent = None
646         self.childname = None
647         self.filenode = None
648         self.metadata = None
649
650         # self.consumer should only be relied on in callbacks for self.async, since it might
651         # not be set before then.
652         self.consumer = None
653
654     def open(self, parent=None, childname=None, filenode=None, metadata=None):
655         self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
656                  (parent, childname, filenode, metadata), level=OPERATIONAL)
657
658         precondition(isinstance(childname, (unicode, NoneType)), childname=childname)
659         precondition(filenode is None or IFileNode.providedBy(filenode), filenode=filenode)
660         precondition(not self.closed, sftpfile=self)
661
662         # If the file has been renamed, the new (parent, childname) takes precedence.
663         if self.parent is None:
664             self.parent = parent
665         if self.childname is None:
666             self.childname = childname
667         self.filenode = filenode
668         self.metadata = metadata
669
670         tempfile_maker = EncryptedTemporaryFile
671
672         if (self.flags & FXF_TRUNC) or not filenode:
673             # We're either truncating or creating the file, so we don't need the old contents.
674             self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
675             self.consumer.finish()
676         else:
677             self.async.addCallback(lambda ignored: filenode.get_best_readable_version())
678
679             def _read(version):
680                 if noisy: self.log("_read", level=NOISY)
681                 download_size = version.get_size()
682                 _assert(download_size is not None)
683
684                 self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
685
686                 version.read(self.consumer, 0, None)
687             self.async.addCallback(_read)
688
689         eventually(self.async.callback, None)
690
691         if noisy: self.log("open done", level=NOISY)
692         return self
693
694     def get_userpath(self):
695         return self.userpath
696
697     def get_direntry(self):
698         return _direntry_for(self.parent, self.childname)
699
700     def rename(self, new_userpath, new_parent, new_childname):
701         self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
702
703         precondition(isinstance(new_userpath, str) and isinstance(new_childname, unicode),
704                      new_userpath=new_userpath, new_childname=new_childname)
705         self.userpath = new_userpath
706         self.parent = new_parent
707         self.childname = new_childname
708
709     def abandon(self):
710         self.log(".abandon()", level=OPERATIONAL)
711
712         self.abandoned = True
713
714     def sync(self, ign=None):
715         # The ign argument allows some_file.sync to be used as a callback.
716         self.log(".sync()", level=OPERATIONAL)
717
718         d = defer.Deferred()
719         self.async.addBoth(eventually_callback(d))
720         def _done(res):
721             if noisy: self.log("_done(%r) in .sync()" % (res,), level=NOISY)
722             return res
723         d.addBoth(_done)
724         return d
725
726     def readChunk(self, offset, length):
727         request = ".readChunk(%r, %r)" % (offset, length)
728         self.log(request, level=OPERATIONAL)
729
730         if not (self.flags & FXF_READ):
731             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for reading")
732             return defer.execute(_denied)
733
734         if self.closed:
735             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
736             return defer.execute(_closed)
737
738         d = defer.Deferred()
739         def _read(ign):
740             if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
741             d2 = self.consumer.read(offset, length)
742             d2.addCallbacks(eventually_callback(d), eventually_errback(d))
743             # It is correct to drop d2 here.
744             return None
745         self.async.addCallbacks(_read, eventually_errback(d))
746         d.addBoth(_convert_error, request)
747         return d
748
749     def writeChunk(self, offset, data):
750         self.log(".writeChunk(%r, <data of length %r>)" % (offset, len(data)), level=OPERATIONAL)
751
752         if not (self.flags & FXF_WRITE):
753             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
754             return defer.execute(_denied)
755
756         if self.closed:
757             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
758             return defer.execute(_closed)
759
760         self.has_changed = True
761
762         # Note that we return without waiting for the write to occur. Reads and
763         # close wait for prior writes, and will fail if any prior operation failed.
764         # This is ok because SFTP makes no guarantee that the write completes
765         # before the request does. In fact it explicitly allows write errors to be
766         # delayed until close:
767         #   "One should note that on some server platforms even a close can fail.
768         #    This can happen e.g. if the server operating system caches writes,
769         #    and an error occurs while flushing cached writes during the close."
770
771         def _write(ign):
772             if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
773                                (offset, len(data), self.consumer.get_current_size()), level=NOISY)
774             # FXF_APPEND means that we should always write at the current end of file.
775             write_offset = offset
776             if self.flags & FXF_APPEND:
777                 write_offset = self.consumer.get_current_size()
778
779             self.consumer.overwrite(write_offset, data)
780             if noisy: self.log("overwrite done", level=NOISY)
781             return None
782         self.async.addCallback(_write)
783         # don't addErrback to self.async, just allow subsequent async ops to fail.
784         return defer.succeed(None)
785
786     def close(self):
787         request = ".close()"
788         self.log(request, level=OPERATIONAL)
789
790         if self.closed:
791             return defer.succeed(None)
792
793         # This means that close has been called, not that the close has succeeded.
794         self.closed = True
795
796         if not (self.flags & (FXF_WRITE | FXF_CREAT)):
797             def _readonly_close():
798                 if self.consumer:
799                     self.consumer.close()
800             return defer.execute(_readonly_close)
801
802         # We must capture the abandoned, parent, and childname variables synchronously
803         # at the close call. This is needed by the correctness arguments in the comments
804         # for _abandon_any_heisenfiles and _rename_heisenfiles.
805         # Note that the file must have been opened before it can be closed.
806         abandoned = self.abandoned
807         parent = self.parent
808         childname = self.childname
809
810         # has_changed is set when writeChunk is called, not when the write occurs, so
811         # it is correct to optimize out the commit if it is False at the close call.
812         has_changed = self.has_changed
813
814         def _committed(res):
815             if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
816
817             self.consumer.close()
818
819             # We must close_notify before re-firing self.async.
820             if self.close_notify:
821                 self.close_notify(self.userpath, self.parent, self.childname, self)
822             return res
823
824         def _close(ign):
825             d2 = self.consumer.when_done()
826             if self.filenode and self.filenode.is_mutable():
827                 self.log("update mutable file %r childname=%r metadata=%r" % (self.filenode, childname, self.metadata), level=OPERATIONAL)
828                 if self.metadata.get('no-write', False) and not self.filenode.is_readonly():
829                     _assert(parent and childname, parent=parent, childname=childname, metadata=self.metadata)
830                     d2.addCallback(lambda ign: parent.set_metadata_for(childname, self.metadata))
831
832                 d2.addCallback(lambda ign: self.filenode.overwrite(MutableFileHandle(self.consumer.get_file())))
833             else:
834                 def _add_file(ign):
835                     self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
836                     u = FileHandle(self.consumer.get_file(), self.convergence)
837                     return parent.add_file(childname, u, metadata=self.metadata)
838                 d2.addCallback(_add_file)
839
840             d2.addBoth(_committed)
841             return d2
842
843         d = defer.Deferred()
844
845         # If the file has been abandoned, we don't want the close operation to get "stuck",
846         # even if self.async fails to re-fire. Doing the close independently of self.async
847         # in that case ensures that dropping an ssh connection is sufficient to abandon
848         # any heisenfiles that were not explicitly closed in that connection.
849         if abandoned or not has_changed:
850             d.addCallback(_committed)
851         else:
852             self.async.addCallback(_close)
853
854         self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
855         d.addBoth(_convert_error, request)
856         return d
857
858     def getAttrs(self):
859         request = ".getAttrs()"
860         self.log(request, level=OPERATIONAL)
861
862         if self.closed:
863             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
864             return defer.execute(_closed)
865
866         # Optimization for read-only handles, when we already know the metadata.
867         if not (self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
868             return defer.succeed(_populate_attrs(self.filenode, self.metadata))
869
870         d = defer.Deferred()
871         def _get(ign):
872             if noisy: self.log("_get(%r) in %r, filenode = %r, metadata = %r" % (ign, request, self.filenode, self.metadata), level=NOISY)
873
874             # self.filenode might be None, but that's ok.
875             attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
876             eventually(d.callback, attrs)
877             return None
878         self.async.addCallbacks(_get, eventually_errback(d))
879         d.addBoth(_convert_error, request)
880         return d
881
882     def setAttrs(self, attrs, only_if_at=None):
883         request = ".setAttrs(%r, only_if_at=%r)" % (attrs, only_if_at)
884         self.log(request, level=OPERATIONAL)
885
886         if not (self.flags & FXF_WRITE):
887             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
888             return defer.execute(_denied)
889
890         if self.closed:
891             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
892             return defer.execute(_closed)
893
894         size = attrs.get("size", None)
895         if size is not None and (not isinstance(size, (int, long)) or size < 0):
896             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
897             return defer.execute(_bad)
898
899         d = defer.Deferred()
900         def _set(ign):
901             if noisy: self.log("_set(%r) in %r" % (ign, request), level=NOISY)
902             current_direntry = _direntry_for(self.parent, self.childname, self.filenode)
903             if only_if_at and only_if_at != current_direntry:
904                 if noisy: self.log("not setting attributes: current_direntry=%r in %r" %
905                                    (current_direntry, request), level=NOISY)
906                 return None
907
908             now = time()
909             self.metadata = update_metadata(self.metadata, _attrs_to_metadata(attrs), now)
910             if size is not None:
911                 # TODO: should we refuse to truncate a file opened with FXF_APPEND?
912                 # <http://allmydata.org/trac/tahoe-lafs/ticket/1037#comment:20>
913                 self.consumer.set_current_size(size)
914             eventually(d.callback, None)
915             return None
916         self.async.addCallbacks(_set, eventually_errback(d))
917         d.addBoth(_convert_error, request)
918         return d
919
920
921 class StoppableList:
922     def __init__(self, items):
923         self.items = items
924     def __iter__(self):
925         for i in self.items:
926             yield i
927     def close(self):
928         pass
929
930
931 class Reason:
932     def __init__(self, value):
933         self.value = value
934
935
936 # A "heisenfile" is a file that has been opened with write flags
937 # (FXF_WRITE and/or FXF_CREAT) and not yet close-notified.
938 # 'all_heisenfiles' maps from a direntry string to a list of
939 # GeneralSFTPFile.
940 #
941 # A direntry string is parent_write_uri + "/" + childname_utf8 for
942 # an immutable file, or file_write_uri for a mutable file.
943 # Updates to this dict are single-threaded.
944
945 all_heisenfiles = {}
946
947 def _reload():
948     global all_heisenfiles
949     all_heisenfiles = {}
950
951 class SFTPUserHandler(ConchUser, PrefixingLogMixin):
952     implements(ISFTPServer)
953     def __init__(self, client, rootnode, username):
954         ConchUser.__init__(self)
955         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=username)
956         if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
957
958         self.channelLookup["session"] = session.SSHSession
959         self.subsystemLookup["sftp"] = FileTransferServer
960
961         self._client = client
962         self._root = rootnode
963         self._username = username
964         self._convergence = client.convergence
965
966         # maps from UTF-8 paths for this user, to files written and still open
967         self._heisenfiles = {}
968
969     def gotVersion(self, otherVersion, extData):
970         self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
971
972         # advertise the same extensions as the OpenSSH SFTP server
973         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
974         return {'posix-rename@openssh.com': '1',
975                 'statvfs@openssh.com': '2',
976                 'fstatvfs@openssh.com': '2',
977                }
978
979     def logout(self):
980         self.log(".logout()", level=OPERATIONAL)
981
982         for files in self._heisenfiles.itervalues():
983             for f in files:
984                 f.abandon()
985
986     def _add_heisenfile_by_path(self, file):
987         self.log("._add_heisenfile_by_path(%r)" % (file,), level=OPERATIONAL)
988
989         userpath = file.get_userpath()
990         if userpath in self._heisenfiles:
991             self._heisenfiles[userpath] += [file]
992         else:
993             self._heisenfiles[userpath] = [file]
994
995     def _add_heisenfile_by_direntry(self, file):
996         self.log("._add_heisenfile_by_direntry(%r)" % (file,), level=OPERATIONAL)
997
998         direntry = file.get_direntry()
999         if direntry:
1000             if direntry in all_heisenfiles:
1001                 all_heisenfiles[direntry] += [file]
1002             else:
1003                 all_heisenfiles[direntry] = [file]
1004
1005     def _abandon_any_heisenfiles(self, userpath, direntry):
1006         request = "._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry)
1007         self.log(request, level=OPERATIONAL)
1008
1009         precondition(isinstance(userpath, str), userpath=userpath)
1010
1011         # First we synchronously mark all heisenfiles matching the userpath or direntry
1012         # as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
1013         # each file that we abandoned.
1014         #
1015         # For each file, the call to .abandon() occurs:
1016         #   * before the file is closed, in which case it will never be committed
1017         #     (uploaded+linked or published); or
1018         #   * after it is closed but before it has been close_notified, in which case the
1019         #     .sync() ensures that it has been committed (successfully or not) before we
1020         #     return.
1021         #
1022         # This avoids a race that might otherwise cause the file to be committed after
1023         # the remove operation has completed.
1024         #
1025         # We return a Deferred that fires with True if any files were abandoned (this
1026         # does not mean that they were not committed; it is used to determine whether
1027         # a NoSuchChildError from the attempt to delete the file should be suppressed).
1028
1029         files = []
1030         if direntry in all_heisenfiles:
1031             files = all_heisenfiles[direntry]
1032             del all_heisenfiles[direntry]
1033         if userpath in self._heisenfiles:
1034             files += self._heisenfiles[userpath]
1035             del self._heisenfiles[userpath]
1036
1037         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1038
1039         for f in files:
1040             f.abandon()
1041
1042         d = defer.succeed(None)
1043         for f in files:
1044             d.addBoth(f.sync)
1045
1046         def _done(ign):
1047             self.log("done %r" % (request,), level=OPERATIONAL)
1048             return len(files) > 0
1049         d.addBoth(_done)
1050         return d
1051
1052     def _rename_heisenfiles(self, from_userpath, from_parent, from_childname,
1053                             to_userpath, to_parent, to_childname, overwrite=True):
1054         request = ("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" %
1055                    (from_userpath, from_parent, from_childname, to_userpath, to_parent, to_childname, overwrite))
1056         self.log(request, level=OPERATIONAL)
1057
1058         precondition((isinstance(from_userpath, str) and isinstance(from_childname, unicode) and
1059                       isinstance(to_userpath, str) and isinstance(to_childname, unicode)),
1060                      from_userpath=from_userpath, from_childname=from_childname, to_userpath=to_userpath, to_childname=to_childname)
1061
1062         if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1063
1064         # First we synchronously rename all heisenfiles matching the userpath or direntry.
1065         # Then we .sync() each file that we renamed.
1066         #
1067         # For each file, the call to .rename occurs:
1068         #   * before the file is closed, in which case it will be committed at the
1069         #     new direntry; or
1070         #   * after it is closed but before it has been close_notified, in which case the
1071         #     .sync() ensures that it has been committed (successfully or not) before we
1072         #     return.
1073         #
1074         # This avoids a race that might otherwise cause the file to be committed at the
1075         # old name after the rename operation has completed.
1076         #
1077         # Note that if overwrite is False, the caller should already have checked
1078         # whether a real direntry exists at the destination. It is possible that another
1079         # direntry (heisen or real) comes to exist at the destination after that check,
1080         # but in that case it is correct for the rename to succeed (and for the commit
1081         # of the heisenfile at the destination to possibly clobber the other entry, since
1082         # that can happen anyway when we have concurrent write handles to the same direntry).
1083         #
1084         # We return a Deferred that fires with True if any files were renamed (this
1085         # does not mean that they were not committed; it is used to determine whether
1086         # a NoSuchChildError from the rename attempt should be suppressed). If overwrite
1087         # is False and there were already heisenfiles at the destination userpath or
1088         # direntry, we return a Deferred that fails with SFTPError(FX_PERMISSION_DENIED).
1089
1090         from_direntry = _direntry_for(from_parent, from_childname)
1091         to_direntry = _direntry_for(to_parent, to_childname)
1092
1093         if noisy: self.log("from_direntry = %r, to_direntry = %r, len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
1094                            (from_direntry, to_direntry, len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
1095
1096         if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
1097             def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1098             if noisy: self.log("existing", level=NOISY)
1099             return defer.execute(_existing)
1100
1101         from_files = []
1102         if from_direntry in all_heisenfiles:
1103             from_files = all_heisenfiles[from_direntry]
1104             del all_heisenfiles[from_direntry]
1105         if from_userpath in self._heisenfiles:
1106             from_files += self._heisenfiles[from_userpath]
1107             del self._heisenfiles[from_userpath]
1108
1109         if noisy: self.log("from_files = %r in %r" % (from_files, request), level=NOISY)
1110
1111         for f in from_files:
1112             f.rename(to_userpath, to_parent, to_childname)
1113             self._add_heisenfile_by_path(f)
1114             self._add_heisenfile_by_direntry(f)
1115
1116         d = defer.succeed(None)
1117         for f in from_files:
1118             d.addBoth(f.sync)
1119
1120         def _done(ign):
1121             if noisy: self.log("done: len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
1122                                (len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
1123             return len(from_files) > 0
1124         d.addBoth(_done)
1125         return d
1126
1127     def _update_attrs_for_heisenfiles(self, userpath, direntry, attrs):
1128         request = "._update_attrs_for_heisenfiles(%r, %r, %r)" % (userpath, direntry, attrs)
1129         self.log(request, level=OPERATIONAL)
1130
1131         _assert(isinstance(userpath, str) and isinstance(direntry, str),
1132                 userpath=userpath, direntry=direntry)
1133
1134         files = []
1135         if direntry in all_heisenfiles:
1136             files = all_heisenfiles[direntry]
1137         if userpath in self._heisenfiles:
1138             files += self._heisenfiles[userpath]
1139
1140         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1141
1142         # We set the metadata for all heisenfiles at this path or direntry.
1143         # Since a direntry includes a write URI, we must have authority to
1144         # change the metadata of heisenfiles found in the all_heisenfiles dict.
1145         # However that's not necessarily the case for heisenfiles found by
1146         # path. Therefore we tell the setAttrs method of each file to only
1147         # perform the update if the file is at the correct direntry.
1148
1149         d = defer.succeed(None)
1150         for f in files:
1151             d.addBoth(f.setAttrs, attrs, only_if_at=direntry)
1152
1153         def _done(ign):
1154             self.log("done %r" % (request,), level=OPERATIONAL)
1155             # TODO: this should not return True if only_if_at caused all files to be skipped.
1156             return len(files) > 0
1157         d.addBoth(_done)
1158         return d
1159
1160     def _sync_heisenfiles(self, userpath, direntry, ignore=None):
1161         request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
1162         self.log(request, level=OPERATIONAL)
1163
1164         _assert(isinstance(userpath, str) and isinstance(direntry, (str, NoneType)),
1165                 userpath=userpath, direntry=direntry)
1166
1167         files = []
1168         if direntry in all_heisenfiles:
1169             files = all_heisenfiles[direntry]
1170         if userpath in self._heisenfiles:
1171             files += self._heisenfiles[userpath]
1172
1173         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1174
1175         d = defer.succeed(None)
1176         for f in files:
1177             if f is not ignore:
1178                 d.addBoth(f.sync)
1179
1180         def _done(ign):
1181             self.log("done %r" % (request,), level=OPERATIONAL)
1182             return None
1183         d.addBoth(_done)
1184         return d
1185
1186     def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
1187         if noisy: self.log("._remove_heisenfile(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
1188
1189         _assert(isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)),
1190                 userpath=userpath, childname=childname)
1191
1192         direntry = _direntry_for(parent, childname)
1193         if direntry in all_heisenfiles:
1194             all_old_files = all_heisenfiles[direntry]
1195             all_new_files = [f for f in all_old_files if f is not file_to_remove]
1196             if len(all_new_files) > 0:
1197                 all_heisenfiles[direntry] = all_new_files
1198             else:
1199                 del all_heisenfiles[direntry]
1200
1201         if userpath in self._heisenfiles:
1202             old_files = self._heisenfiles[userpath]
1203             new_files = [f for f in old_files if f is not file_to_remove]
1204             if len(new_files) > 0:
1205                 self._heisenfiles[userpath] = new_files
1206             else:
1207                 del self._heisenfiles[userpath]
1208
1209         if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1210
1211     def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
1212         if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
1213                            (existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
1214                            level=NOISY)
1215
1216         _assert((isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)) and
1217                 (metadata is None or 'no-write' in metadata)),
1218                 userpath=userpath, childname=childname, metadata=metadata)
1219
1220         writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
1221         direntry = _direntry_for(parent, childname, filenode)
1222
1223         d = self._sync_heisenfiles(userpath, direntry, ignore=existing_file)
1224
1225         if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
1226             d.addCallback(lambda ign: ShortReadOnlySFTPFile(userpath, filenode, metadata))
1227         else:
1228             close_notify = None
1229             if writing:
1230                 close_notify = self._remove_heisenfile
1231
1232             d.addCallback(lambda ign: existing_file or GeneralSFTPFile(userpath, flags, close_notify, self._convergence))
1233             def _got_file(file):
1234                 file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
1235                 if writing:
1236                     self._add_heisenfile_by_direntry(file)
1237                 return file
1238             d.addCallback(_got_file)
1239         return d
1240
1241     def openFile(self, pathstring, flags, attrs, delay=None):
1242         request = ".openFile(%r, %r = %r, %r, delay=%r)" % (pathstring, flags, _repr_flags(flags), attrs, delay)
1243         self.log(request, level=OPERATIONAL)
1244
1245         # This is used for both reading and writing.
1246         # First exclude invalid combinations of flags, and empty paths.
1247
1248         if not (flags & (FXF_READ | FXF_WRITE)):
1249             def _bad_readwrite():
1250                 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
1251             return defer.execute(_bad_readwrite)
1252
1253         if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
1254             def _bad_exclcreat():
1255                 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
1256             return defer.execute(_bad_exclcreat)
1257
1258         path = self._path_from_string(pathstring)
1259         if not path:
1260             def _emptypath(): raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
1261             return defer.execute(_emptypath)
1262
1263         # The combination of flags is potentially valid.
1264
1265         # To work around clients that have race condition bugs, a getAttr, rename, or
1266         # remove request following an 'open' request with FXF_WRITE or FXF_CREAT flags,
1267         # should succeed even if the 'open' request has not yet completed. So we now
1268         # synchronously add a file object into the self._heisenfiles dict, indexed
1269         # by its UTF-8 userpath. (We can't yet add it to the all_heisenfiles dict,
1270         # because we don't yet have a user-independent path for the file.) The file
1271         # object does not know its filenode, parent, or childname at this point.
1272
1273         userpath = self._path_to_utf8(path)
1274
1275         if flags & (FXF_WRITE | FXF_CREAT):
1276             file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
1277             self._add_heisenfile_by_path(file)
1278         else:
1279             # We haven't decided which file implementation to use yet.
1280             file = None
1281
1282         desired_metadata = _attrs_to_metadata(attrs)
1283
1284         # Now there are two major cases:
1285         #
1286         #  1. The path is specified as /uri/FILECAP, with no parent directory.
1287         #     If the FILECAP is mutable and writeable, then we can open it in write-only
1288         #     or read/write mode (non-exclusively), otherwise we can only open it in
1289         #     read-only mode. The open should succeed immediately as long as FILECAP is
1290         #     a valid known filecap that grants the required permission.
1291         #
1292         #  2. The path is specified relative to a parent. We find the parent dirnode and
1293         #     get the child's URI and metadata if it exists. There are four subcases:
1294         #       a. the child does not exist: FXF_CREAT must be set, and we must be able
1295         #          to write to the parent directory.
1296         #       b. the child exists but is not a valid known filecap: fail
1297         #       c. the child is mutable: if we are trying to open it write-only or
1298         #          read/write, then we must be able to write to the file.
1299         #       d. the child is immutable: if we are trying to open it write-only or
1300         #          read/write, then we must be able to write to the parent directory.
1301         #
1302         # To reduce latency, open normally succeeds as soon as these conditions are
1303         # met, even though there might be a failure in downloading the existing file
1304         # or uploading a new one. However, there is an exception: if a file has been
1305         # written, then closed, and is now being reopened, then we have to delay the
1306         # open until the previous upload/publish has completed. This is necessary
1307         # because sshfs does not wait for the result of an FXF_CLOSE message before
1308         # reporting to the client that a file has been closed. It applies both to
1309         # mutable files, and to directory entries linked to an immutable file.
1310         #
1311         # Note that the permission checks below are for more precise error reporting on
1312         # the open call; later operations would fail even if we did not make these checks.
1313
1314         d = delay or defer.succeed(None)
1315         d.addCallback(lambda ign: self._get_root(path))
1316         def _got_root( (root, path) ):
1317             if root.is_unknown():
1318                 raise SFTPError(FX_PERMISSION_DENIED,
1319                                 "cannot open an unknown cap (or child of an unknown object). "
1320                                 "Upgrading the gateway to a later Tahoe-LAFS version may help")
1321             if not path:
1322                 # case 1
1323                 if noisy: self.log("case 1: root = %r, path[:-1] = %r" % (root, path[:-1]), level=NOISY)
1324                 if not IFileNode.providedBy(root):
1325                     raise SFTPError(FX_PERMISSION_DENIED,
1326                                     "cannot open a directory cap")
1327                 if (flags & FXF_WRITE) and root.is_readonly():
1328                     raise SFTPError(FX_PERMISSION_DENIED,
1329                                     "cannot write to a non-writeable filecap without a parent directory")
1330                 if flags & FXF_EXCL:
1331                     raise SFTPError(FX_FAILURE,
1332                                     "cannot create a file exclusively when it already exists")
1333
1334                 # The file does not need to be added to all_heisenfiles, because it is not
1335                 # associated with a directory entry that needs to be updated.
1336
1337                 metadata = update_metadata(None, desired_metadata, time())
1338
1339                 # We have to decide what to pass for the 'parent_readonly' argument to _no_write,
1340                 # given that we don't actually have a parent. This only affects the permissions
1341                 # reported by a getAttrs on this file handle in the case of an immutable file.
1342                 # We choose 'parent_readonly=True' since that will cause the permissions to be
1343                 # reported as r--r--r--, which is appropriate because an immutable file can't be
1344                 # written via this path.
1345
1346                 metadata['no-write'] = _no_write(True, root)
1347                 return self._make_file(file, userpath, flags, filenode=root, metadata=metadata)
1348             else:
1349                 # case 2
1350                 childname = path[-1]
1351
1352                 if noisy: self.log("case 2: root = %r, childname = %r, desired_metadata = %r, path[:-1] = %r" %
1353                                    (root, childname, desired_metadata, path[:-1]), level=NOISY)
1354                 d2 = root.get_child_at_path(path[:-1])
1355                 def _got_parent(parent):
1356                     if noisy: self.log("_got_parent(%r)" % (parent,), level=NOISY)
1357                     if parent.is_unknown():
1358                         raise SFTPError(FX_PERMISSION_DENIED,
1359                                         "cannot open a child of an unknown object. "
1360                                         "Upgrading the gateway to a later Tahoe-LAFS version may help")
1361
1362                     parent_readonly = parent.is_readonly()
1363                     d3 = defer.succeed(None)
1364                     if flags & FXF_EXCL:
1365                         # FXF_EXCL means that the link to the file (not the file itself) must
1366                         # be created atomically wrt updates by this storage client.
1367                         # That is, we need to create the link before returning success to the
1368                         # SFTP open request (and not just on close, as would normally be the
1369                         # case). We make the link initially point to a zero-length LIT file,
1370                         # which is consistent with what might happen on a POSIX filesystem.
1371
1372                         if parent_readonly:
1373                             raise SFTPError(FX_FAILURE,
1374                                             "cannot create a file exclusively when the parent directory is read-only")
1375
1376                         # 'overwrite=False' ensures failure if the link already exists.
1377                         # FIXME: should use a single call to set_uri and return (child, metadata) (#1035)
1378
1379                         zero_length_lit = "URI:LIT:"
1380                         if noisy: self.log("%r.set_uri(%r, None, readcap=%r, overwrite=False)" %
1381                                            (parent, zero_length_lit, childname), level=NOISY)
1382                         d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit,
1383                                                                   metadata=desired_metadata, overwrite=False))
1384                         def _seturi_done(child):
1385                             if noisy: self.log("%r.get_metadata_for(%r)" % (parent, childname), level=NOISY)
1386                             d4 = parent.get_metadata_for(childname)
1387                             d4.addCallback(lambda metadata: (child, metadata))
1388                             return d4
1389                         d3.addCallback(_seturi_done)
1390                     else:
1391                         if noisy: self.log("%r.get_child_and_metadata(%r)" % (parent, childname), level=NOISY)
1392                         d3.addCallback(lambda ign: parent.get_child_and_metadata(childname))
1393
1394                     def _got_child( (filenode, current_metadata) ):
1395                         if noisy: self.log("_got_child( (%r, %r) )" % (filenode, current_metadata), level=NOISY)
1396
1397                         metadata = update_metadata(current_metadata, desired_metadata, time())
1398
1399                         # Ignore the permissions of the desired_metadata in an open call. The permissions
1400                         # can only be set by setAttrs.
1401                         metadata['no-write'] = _no_write(parent_readonly, filenode, current_metadata)
1402
1403                         if filenode.is_unknown():
1404                             raise SFTPError(FX_PERMISSION_DENIED,
1405                                             "cannot open an unknown cap. Upgrading the gateway "
1406                                             "to a later Tahoe-LAFS version may help")
1407                         if not IFileNode.providedBy(filenode):
1408                             raise SFTPError(FX_PERMISSION_DENIED,
1409                                             "cannot open a directory as if it were a file")
1410                         if (flags & FXF_WRITE) and metadata['no-write']:
1411                             raise SFTPError(FX_PERMISSION_DENIED,
1412                                             "cannot open a non-writeable file for writing")
1413
1414                         return self._make_file(file, userpath, flags, parent=parent, childname=childname,
1415                                                filenode=filenode, metadata=metadata)
1416                     def _no_child(f):
1417                         if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
1418                         f.trap(NoSuchChildError)
1419
1420                         if not (flags & FXF_CREAT):
1421                             raise SFTPError(FX_NO_SUCH_FILE,
1422                                             "the file does not exist, and was not opened with the creation (CREAT) flag")
1423                         if parent_readonly:
1424                             raise SFTPError(FX_PERMISSION_DENIED,
1425                                             "cannot create a file when the parent directory is read-only")
1426
1427                         return self._make_file(file, userpath, flags, parent=parent, childname=childname)
1428                     d3.addCallbacks(_got_child, _no_child)
1429                     return d3
1430
1431                 d2.addCallback(_got_parent)
1432                 return d2
1433
1434         d.addCallback(_got_root)
1435         def _remove_on_error(err):
1436             if file:
1437                 self._remove_heisenfile(userpath, None, None, file)
1438             return err
1439         d.addErrback(_remove_on_error)
1440         d.addBoth(_convert_error, request)
1441         return d
1442
1443     def renameFile(self, from_pathstring, to_pathstring, overwrite=False):
1444         request = ".renameFile(%r, %r)" % (from_pathstring, to_pathstring)
1445         self.log(request, level=OPERATIONAL)
1446
1447         from_path = self._path_from_string(from_pathstring)
1448         to_path = self._path_from_string(to_pathstring)
1449         from_userpath = self._path_to_utf8(from_path)
1450         to_userpath = self._path_to_utf8(to_path)
1451
1452         # the target directory must already exist
1453         d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
1454                                         self._get_parent_or_node(to_path)])
1455         def _got( (from_pair, to_pair) ):
1456             if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r, overwrite=%r)" %
1457                                (from_pair, to_pair, from_pathstring, to_pathstring, overwrite), level=NOISY)
1458             (from_parent, from_childname) = from_pair
1459             (to_parent, to_childname) = to_pair
1460
1461             if from_childname is None:
1462                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename a source object specified by URI")
1463             if to_childname is None:
1464                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename to a destination specified by URI")
1465
1466             # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
1467             # "It is an error if there already exists a file with the name specified
1468             #  by newpath."
1469             # OpenSSH's SFTP server returns FX_PERMISSION_DENIED for this error.
1470             #
1471             # For the standard SSH_FXP_RENAME operation, overwrite=False.
1472             # We also support the posix-rename@openssh.com extension, which uses overwrite=True.
1473
1474             d2 = defer.succeed(None)
1475             if not overwrite:
1476                 d2.addCallback(lambda ign: to_parent.get(to_childname))
1477                 def _expect_fail(res):
1478                     if not isinstance(res, Failure):
1479                         raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1480
1481                     # It is OK if we fail for errors other than NoSuchChildError, since that probably
1482                     # indicates some problem accessing the destination directory.
1483                     res.trap(NoSuchChildError)
1484                 d2.addBoth(_expect_fail)
1485
1486             # If there are heisenfiles to be written at the 'from' direntry, then ensure
1487             # they will now be written at the 'to' direntry instead.
1488             d2.addCallback(lambda ign:
1489                            self._rename_heisenfiles(from_userpath, from_parent, from_childname,
1490                                                     to_userpath, to_parent, to_childname, overwrite=overwrite))
1491
1492             def _move(renamed):
1493                 # FIXME: use move_child_to_path to avoid possible data loss due to #943
1494                 #d3 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
1495
1496                 d3 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
1497                 def _check(err):
1498                     if noisy: self.log("_check(%r) in .renameFile(%r, %r, overwrite=%r)" %
1499                                        (err, from_pathstring, to_pathstring, overwrite), level=NOISY)
1500
1501                     if not isinstance(err, Failure) or (renamed and err.check(NoSuchChildError)):
1502                         return None
1503                     if not overwrite and err.check(ExistingChildError):
1504                         raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1505
1506                     return err
1507                 d3.addBoth(_check)
1508                 return d3
1509             d2.addCallback(_move)
1510             return d2
1511         d.addCallback(_got)
1512         d.addBoth(_convert_error, request)
1513         return d
1514
1515     def makeDirectory(self, pathstring, attrs):
1516         request = ".makeDirectory(%r, %r)" % (pathstring, attrs)
1517         self.log(request, level=OPERATIONAL)
1518
1519         path = self._path_from_string(pathstring)
1520         metadata = _attrs_to_metadata(attrs)
1521         if 'no-write' in metadata:
1522             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "cannot create a directory that is initially read-only")
1523             return defer.execute(_denied)
1524
1525         d = self._get_root(path)
1526         d.addCallback(lambda (root, path):
1527                       self._get_or_create_directories(root, path, metadata))
1528         d.addBoth(_convert_error, request)
1529         return d
1530
1531     def _get_or_create_directories(self, node, path, metadata):
1532         if not IDirectoryNode.providedBy(node):
1533             # TODO: provide the name of the blocking file in the error message.
1534             def _blocked(): raise SFTPError(FX_FAILURE, "cannot create directory because there "
1535                                                         "is a file in the way") # close enough
1536             return defer.execute(_blocked)
1537
1538         if not path:
1539             return defer.succeed(node)
1540         d = node.get(path[0])
1541         def _maybe_create(f):
1542             f.trap(NoSuchChildError)
1543             return node.create_subdirectory(path[0])
1544         d.addErrback(_maybe_create)
1545         d.addCallback(self._get_or_create_directories, path[1:], metadata)
1546         return d
1547
1548     def removeFile(self, pathstring):
1549         request = ".removeFile(%r)" % (pathstring,)
1550         self.log(request, level=OPERATIONAL)
1551
1552         path = self._path_from_string(pathstring)
1553         d = self._remove_object(path, must_be_file=True)
1554         d.addBoth(_convert_error, request)
1555         return d
1556
1557     def removeDirectory(self, pathstring):
1558         request = ".removeDirectory(%r)" % (pathstring,)
1559         self.log(request, level=OPERATIONAL)
1560
1561         path = self._path_from_string(pathstring)
1562         d = self._remove_object(path, must_be_directory=True)
1563         d.addBoth(_convert_error, request)
1564         return d
1565
1566     def _remove_object(self, path, must_be_directory=False, must_be_file=False):
1567         userpath = self._path_to_utf8(path)
1568         d = self._get_parent_or_node(path)
1569         def _got_parent( (parent, childname) ):
1570             if childname is None:
1571                 raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
1572
1573             direntry = _direntry_for(parent, childname)
1574             d2 = defer.succeed(False)
1575             if not must_be_directory:
1576                 d2.addCallback(lambda ign: self._abandon_any_heisenfiles(userpath, direntry))
1577
1578             d2.addCallback(lambda abandoned:
1579                            parent.delete(childname, must_exist=not abandoned,
1580                                          must_be_directory=must_be_directory, must_be_file=must_be_file))
1581             return d2
1582         d.addCallback(_got_parent)
1583         return d
1584
1585     def openDirectory(self, pathstring):
1586         request = ".openDirectory(%r)" % (pathstring,)
1587         self.log(request, level=OPERATIONAL)
1588
1589         path = self._path_from_string(pathstring)
1590         d = self._get_parent_or_node(path)
1591         def _got_parent_or_node( (parent_or_node, childname) ):
1592             if noisy: self.log("_got_parent_or_node( (%r, %r) ) in openDirectory(%r)" %
1593                                (parent_or_node, childname, pathstring), level=NOISY)
1594             if childname is None:
1595                 return parent_or_node
1596             else:
1597                 return parent_or_node.get(childname)
1598         d.addCallback(_got_parent_or_node)
1599         def _list(dirnode):
1600             if dirnode.is_unknown():
1601                 raise SFTPError(FX_PERMISSION_DENIED,
1602                                 "cannot list an unknown cap as a directory. Upgrading the gateway "
1603                                 "to a later Tahoe-LAFS version may help")
1604             if not IDirectoryNode.providedBy(dirnode):
1605                 raise SFTPError(FX_PERMISSION_DENIED,
1606                                 "cannot list a file as if it were a directory")
1607
1608             d2 = dirnode.list()
1609             def _render(children):
1610                 parent_readonly = dirnode.is_readonly()
1611                 results = []
1612                 for filename, (child, metadata) in children.iteritems():
1613                     # The file size may be cached or absent.
1614                     metadata['no-write'] = _no_write(parent_readonly, child, metadata)
1615                     attrs = _populate_attrs(child, metadata)
1616                     filename_utf8 = filename.encode('utf-8')
1617                     longname = _lsLine(filename_utf8, attrs)
1618                     results.append( (filename_utf8, longname, attrs) )
1619                 return StoppableList(results)
1620             d2.addCallback(_render)
1621             return d2
1622         d.addCallback(_list)
1623         d.addBoth(_convert_error, request)
1624         return d
1625
1626     def getAttrs(self, pathstring, followLinks):
1627         request = ".getAttrs(%r, followLinks=%r)" % (pathstring, followLinks)
1628         self.log(request, level=OPERATIONAL)
1629
1630         # When asked about a specific file, report its current size.
1631         # TODO: the modification time for a mutable file should be
1632         # reported as the update time of the best version. But that
1633         # information isn't currently stored in mutable shares, I think.
1634
1635         path = self._path_from_string(pathstring)
1636         userpath = self._path_to_utf8(path)
1637         d = self._get_parent_or_node(path)
1638         def _got_parent_or_node( (parent_or_node, childname) ):
1639             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1640
1641             # Some clients will incorrectly try to get the attributes
1642             # of a file immediately after opening it, before it has been put
1643             # into the all_heisenfiles table. This is a race condition bug in
1644             # the client, but we handle it anyway by calling .sync() on all
1645             # files matching either the path or the direntry.
1646
1647             direntry = _direntry_for(parent_or_node, childname)
1648             d2 = self._sync_heisenfiles(userpath, direntry)
1649
1650             if childname is None:
1651                 node = parent_or_node
1652                 d2.addCallback(lambda ign: node.get_current_size())
1653                 d2.addCallback(lambda size:
1654                                _populate_attrs(node, {'no-write': node.is_unknown() or node.is_readonly()}, size=size))
1655             else:
1656                 parent = parent_or_node
1657                 d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname]))
1658                 def _got( (child, metadata) ):
1659                     if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
1660                     _assert(IDirectoryNode.providedBy(parent), parent=parent)
1661                     metadata['no-write'] = _no_write(parent.is_readonly(), child, metadata)
1662                     d3 = child.get_current_size()
1663                     d3.addCallback(lambda size: _populate_attrs(child, metadata, size=size))
1664                     return d3
1665                 def _nosuch(err):
1666                     if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
1667                     err.trap(NoSuchChildError)
1668                     if noisy: self.log("checking open files:\nself._heisenfiles = %r\nall_heisenfiles = %r\ndirentry=%r" %
1669                                        (self._heisenfiles, all_heisenfiles, direntry), level=NOISY)
1670                     if direntry in all_heisenfiles:
1671                         files = all_heisenfiles[direntry]
1672                         if len(files) == 0:  # pragma: no cover
1673                             return err
1674                         # use the heisenfile that was most recently opened
1675                         return files[-1].getAttrs()
1676                     return err
1677                 d2.addCallbacks(_got, _nosuch)
1678             return d2
1679         d.addCallback(_got_parent_or_node)
1680         d.addBoth(_convert_error, request)
1681         return d
1682
1683     def setAttrs(self, pathstring, attrs):
1684         request = ".setAttrs(%r, %r)" % (pathstring, attrs)
1685         self.log(request, level=OPERATIONAL)
1686
1687         if "size" in attrs:
1688             # this would require us to download and re-upload the truncated/extended
1689             # file contents
1690             def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute unsupported")
1691             return defer.execute(_unsupported)
1692
1693         path = self._path_from_string(pathstring)
1694         userpath = self._path_to_utf8(path)
1695         d = self._get_parent_or_node(path)
1696         def _got_parent_or_node( (parent_or_node, childname) ):
1697             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1698
1699             direntry = _direntry_for(parent_or_node, childname)
1700             d2 = self._update_attrs_for_heisenfiles(userpath, direntry, attrs)
1701
1702             def _update(updated_heisenfiles):
1703                 if childname is None:
1704                     if updated_heisenfiles:
1705                         return None
1706                     raise SFTPError(FX_NO_SUCH_FILE, userpath)
1707                 else:
1708                     desired_metadata = _attrs_to_metadata(attrs)
1709                     if noisy: self.log("desired_metadata = %r" % (desired_metadata,), level=NOISY)
1710
1711                     d3 = parent_or_node.set_metadata_for(childname, desired_metadata)
1712                     def _nosuch(err):
1713                         if updated_heisenfiles:
1714                             err.trap(NoSuchChildError)
1715                         else:
1716                             return err
1717                     d3.addErrback(_nosuch)
1718                     return d3
1719             d2.addCallback(_update)
1720             d2.addCallback(lambda ign: None)
1721             return d2
1722         d.addCallback(_got_parent_or_node)
1723         d.addBoth(_convert_error, request)
1724         return d
1725
1726     def readLink(self, pathstring):
1727         self.log(".readLink(%r)" % (pathstring,), level=OPERATIONAL)
1728
1729         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "readLink")
1730         return defer.execute(_unsupported)
1731
1732     def makeLink(self, linkPathstring, targetPathstring):
1733         self.log(".makeLink(%r, %r)" % (linkPathstring, targetPathstring), level=OPERATIONAL)
1734
1735         # If this is implemented, note the reversal of arguments described in point 7 of
1736         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>.
1737
1738         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "makeLink")
1739         return defer.execute(_unsupported)
1740
1741     def extendedRequest(self, extensionName, extensionData):
1742         self.log(".extendedRequest(%r, <data of length %r>)" % (extensionName, len(extensionData)), level=OPERATIONAL)
1743
1744         # We implement the three main OpenSSH SFTP extensions; see
1745         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1746
1747         if extensionName == 'posix-rename@openssh.com':
1748             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "could not parse posix-rename@openssh.com request")
1749
1750             if 4 > len(extensionData): return defer.execute(_bad)
1751             (fromPathLen,) = struct.unpack('>L', extensionData[0:4])
1752             if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
1753
1754             (toPathLen,) = struct.unpack('>L', extensionData[(4 + fromPathLen):(8 + fromPathLen)])
1755             if 8 + fromPathLen + toPathLen != len(extensionData): return defer.execute(_bad)
1756
1757             fromPathstring = extensionData[4:(4 + fromPathLen)]
1758             toPathstring = extensionData[(8 + fromPathLen):]
1759             d = self.renameFile(fromPathstring, toPathstring, overwrite=True)
1760
1761             # Twisted conch assumes that the response from an extended request is either
1762             # an error, or an FXP_EXTENDED_REPLY. But it happens to do the right thing
1763             # (respond with an FXP_STATUS message) if we return a Failure with code FX_OK.
1764             def _succeeded(ign):
1765                 raise SFTPError(FX_OK, "request succeeded")
1766             d.addCallback(_succeeded)
1767             return d
1768
1769         if extensionName == 'statvfs@openssh.com' or extensionName == 'fstatvfs@openssh.com':
1770             # f_bsize and f_frsize should be the same to avoid a bug in 'df'
1771             return defer.succeed(struct.pack('>11Q',
1772                 1024,         # uint64  f_bsize     /* file system block size */
1773                 1024,         # uint64  f_frsize    /* fundamental fs block size */
1774                 628318530,    # uint64  f_blocks    /* number of blocks (unit f_frsize) */
1775                 314159265,    # uint64  f_bfree     /* free blocks in file system */
1776                 314159265,    # uint64  f_bavail    /* free blocks for non-root */
1777                 200000000,    # uint64  f_files     /* total file inodes */
1778                 100000000,    # uint64  f_ffree     /* free file inodes */
1779                 100000000,    # uint64  f_favail    /* free file inodes for non-root */
1780                 0x1AF5,       # uint64  f_fsid      /* file system id */
1781                 2,            # uint64  f_flag      /* bit mask = ST_NOSUID; not ST_RDONLY */
1782                 65535,        # uint64  f_namemax   /* maximum filename length */
1783                 ))
1784
1785         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "unsupported %r request <data of length %r>" %
1786                                                                (extensionName, len(extensionData)))
1787         return defer.execute(_unsupported)
1788
1789     def realPath(self, pathstring):
1790         self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
1791
1792         return self._path_to_utf8(self._path_from_string(pathstring))
1793
1794     def _path_to_utf8(self, path):
1795         return (u"/" + u"/".join(path)).encode('utf-8')
1796
1797     def _path_from_string(self, pathstring):
1798         if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
1799
1800         _assert(isinstance(pathstring, str), pathstring=pathstring)
1801
1802         # The home directory is the root directory.
1803         pathstring = pathstring.strip("/")
1804         if pathstring == "" or pathstring == ".":
1805             path_utf8 = []
1806         else:
1807             path_utf8 = pathstring.split("/")
1808
1809         # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.2>
1810         # "Servers SHOULD interpret a path name component ".." as referring to
1811         #  the parent directory, and "." as referring to the current directory."
1812         path = []
1813         for p_utf8 in path_utf8:
1814             if p_utf8 == "..":
1815                 # ignore excess .. components at the root
1816                 if len(path) > 0:
1817                     path = path[:-1]
1818             elif p_utf8 != ".":
1819                 try:
1820                     p = p_utf8.decode('utf-8', 'strict')
1821                 except UnicodeError:
1822                     raise SFTPError(FX_NO_SUCH_FILE, "path could not be decoded as UTF-8")
1823                 path.append(p)
1824
1825         if noisy: self.log(" PATH %r" % (path,), level=NOISY)
1826         return path
1827
1828     def _get_root(self, path):
1829         # return Deferred (root, remaining_path)
1830         d = defer.succeed(None)
1831         if path and path[0] == u"uri":
1832             d.addCallback(lambda ign: self._client.create_node_from_uri(path[1].encode('utf-8')))
1833             d.addCallback(lambda root: (root, path[2:]))
1834         else:
1835             d.addCallback(lambda ign: (self._root, path))
1836         return d
1837
1838     def _get_parent_or_node(self, path):
1839         # return Deferred (parent, childname) or (node, None)
1840         d = self._get_root(path)
1841         def _got_root( (root, remaining_path) ):
1842             if not remaining_path:
1843                 return (root, None)
1844             else:
1845                 d2 = root.get_child_at_path(remaining_path[:-1])
1846                 d2.addCallback(lambda parent: (parent, remaining_path[-1]))
1847                 return d2
1848         d.addCallback(_got_root)
1849         return d
1850
1851
1852 class FakeTransport:
1853     implements(ITransport)
1854     def write(self, data):
1855         logmsg("FakeTransport.write(<data of length %r>)" % (len(data),), level=NOISY)
1856
1857     def writeSequence(self, data):
1858         logmsg("FakeTransport.writeSequence(...)", level=NOISY)
1859
1860     def loseConnection(self):
1861         logmsg("FakeTransport.loseConnection()", level=NOISY)
1862
1863     # getPeer and getHost can just raise errors, since we don't know what to return
1864
1865
1866 class ShellSession(PrefixingLogMixin):
1867     implements(ISession)
1868     def __init__(self, userHandler):
1869         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
1870         if noisy: self.log(".__init__(%r)" % (userHandler), level=NOISY)
1871
1872     def getPty(self, terminal, windowSize, attrs):
1873         self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
1874
1875     def openShell(self, protocol):
1876         self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
1877         if hasattr(protocol, 'transport') and protocol.transport is None:
1878             protocol.transport = FakeTransport()  # work around Twisted bug
1879
1880         return self._unsupported(protocol)
1881
1882     def execCommand(self, protocol, cmd):
1883         self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
1884         if hasattr(protocol, 'transport') and protocol.transport is None:
1885             protocol.transport = FakeTransport()  # work around Twisted bug
1886
1887         d = defer.succeed(None)
1888         if cmd == "df -P -k /":
1889             d.addCallback(lambda ign: protocol.write(
1890                           "Filesystem         1024-blocks      Used Available Capacity Mounted on\r\n"
1891                           "tahoe                628318530 314159265 314159265      50% /\r\n"))
1892             d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessDone(None))))
1893         else:
1894             d.addCallback(lambda ign: self._unsupported(protocol))
1895         return d
1896
1897     def _unsupported(self, protocol):
1898         d = defer.succeed(None)
1899         d.addCallback(lambda ign: protocol.errReceived(
1900                       "This server supports only the SFTP protocol. It does not support SCP,\r\n"
1901                       "interactive shell sessions, or commands other than one needed by sshfs.\r\n"))
1902         d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
1903         return d
1904
1905     def windowChanged(self, newWindowSize):
1906         self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
1907
1908     def eofReceived(self):
1909         self.log(".eofReceived()", level=OPERATIONAL)
1910
1911     def closed(self):
1912         self.log(".closed()", level=OPERATIONAL)
1913
1914
1915 # If you have an SFTPUserHandler and want something that provides ISession, you get
1916 # ShellSession(userHandler).
1917 # We use adaptation because this must be a different object to the SFTPUserHandler.
1918 components.registerAdapter(ShellSession, SFTPUserHandler, ISession)
1919
1920
1921 from allmydata.frontends.auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
1922
1923 class Dispatcher:
1924     implements(portal.IRealm)
1925     def __init__(self, client):
1926         self._client = client
1927
1928     def requestAvatar(self, avatarID, mind, interface):
1929         _assert(interface == IConchUser, interface=interface)
1930         rootnode = self._client.create_node_from_uri(avatarID.rootcap)
1931         handler = SFTPUserHandler(self._client, rootnode, avatarID.username)
1932         return (interface, handler, handler.logout)
1933
1934
1935 class SFTPServer(service.MultiService):
1936     def __init__(self, client, accountfile, accounturl,
1937                  sftp_portstr, pubkey_file, privkey_file):
1938         service.MultiService.__init__(self)
1939
1940         r = Dispatcher(client)
1941         p = portal.Portal(r)
1942
1943         if accountfile:
1944             c = AccountFileChecker(self, accountfile)
1945             p.registerChecker(c)
1946         if accounturl:
1947             c = AccountURLChecker(self, accounturl)
1948             p.registerChecker(c)
1949         if not accountfile and not accounturl:
1950             # we could leave this anonymous, with just the /uri/CAP form
1951             raise NeedRootcapLookupScheme("must provide an account file or URL")
1952
1953         pubkey = keys.Key.fromFile(pubkey_file)
1954         privkey = keys.Key.fromFile(privkey_file)
1955         class SSHFactory(factory.SSHFactory):
1956             publicKeys = {pubkey.sshType(): pubkey}
1957             privateKeys = {privkey.sshType(): privkey}
1958             def getPrimes(self):
1959                 try:
1960                     # if present, this enables diffie-hellman-group-exchange
1961                     return primes.parseModuliFile("/etc/ssh/moduli")
1962                 except IOError:
1963                     return None
1964
1965         f = SSHFactory()
1966         f.portal = p
1967
1968         s = strports.service(sftp_portstr, f)
1969         s.setServiceParent(self)