]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/sftpd.py
f148384d3781340a2d54bcbf72121d2b23779210
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / sftpd.py
1
2 import heapq, traceback, array, stat, struct
3 from types import NoneType
4 from stat import S_IFREG, S_IFDIR
5 from time import time, strftime, localtime
6
7 from zope.interface import implements
8 from twisted.python import components
9 from twisted.application import service, strports
10 from twisted.conch.ssh import factory, keys, session
11 from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
12      FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
13      FX_BAD_MESSAGE, FX_FAILURE, FX_OK
14 from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
15      FXF_CREAT, FXF_TRUNC, FXF_EXCL
16 from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser, ISession
17 from twisted.conch.avatar import ConchUser
18 from twisted.conch.openssh_compat import primes
19 from twisted.cred import portal
20 from twisted.internet.error import ProcessDone, ProcessTerminated
21 from twisted.python.failure import Failure
22 from twisted.internet.interfaces import ITransport
23
24 from twisted.internet import defer
25 from twisted.internet.interfaces import IFinishableConsumer
26 from foolscap.api import eventually
27 from allmydata.util import deferredutil
28
29 from allmydata.util.consumer import download_to_data
30 from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
31      NoSuchChildError, ChildOfWrongTypeError
32 from allmydata.mutable.common import NotWriteableError
33 from allmydata.mutable.publish import MutableFileHandle
34 from allmydata.immutable.upload import FileHandle
35 from allmydata.dirnode import update_metadata
36 from allmydata.util.fileutil import EncryptedTemporaryFile
37
38 noisy = True
39 use_foolscap_logging = True
40
41 from allmydata.util.log import NOISY, OPERATIONAL, WEIRD, \
42     msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
43
44 if use_foolscap_logging:
45     (logmsg, logerr, PrefixingLogMixin) = (_msg, _err, _PrefixingLogMixin)
46 else:  # pragma: no cover
47     def logmsg(s, level=None):
48         print s
49     def logerr(s, level=None):
50         print s
51     class PrefixingLogMixin:
52         def __init__(self, facility=None, prefix=''):
53             self.prefix = prefix
54         def log(self, s, level=None):
55             print "%r %s" % (self.prefix, s)
56
57
58 def eventually_callback(d):
59     return lambda res: eventually(d.callback, res)
60
61 def eventually_errback(d):
62     return lambda err: eventually(d.errback, err)
63
64
65 def _utf8(x):
66     if isinstance(x, unicode):
67         return x.encode('utf-8')
68     if isinstance(x, str):
69         return x
70     return repr(x)
71
72
73 def _to_sftp_time(t):
74     """SFTP times are unsigned 32-bit integers representing UTC seconds
75     (ignoring leap seconds) since the Unix epoch, January 1 1970 00:00 UTC.
76     A Tahoe time is the corresponding float."""
77     return long(t) & 0xFFFFFFFFL
78
79
80 def _convert_error(res, request):
81     """If res is not a Failure, return it, otherwise reraise the appropriate
82     SFTPError."""
83
84     if not isinstance(res, Failure):
85         logged_res = res
86         if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
87         logmsg("SUCCESS %r %r" % (request, logged_res,), level=OPERATIONAL)
88         return res
89
90     err = res
91     logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
92     try:
93         if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
94     except Exception:  # pragma: no cover
95         pass
96
97     # The message argument to SFTPError must not reveal information that
98     # might compromise anonymity, if we are running over an anonymous network.
99
100     if err.check(SFTPError):
101         # original raiser of SFTPError has responsibility to ensure anonymity
102         raise err
103     if err.check(NoSuchChildError):
104         childname = _utf8(err.value.args[0])
105         raise SFTPError(FX_NO_SUCH_FILE, childname)
106     if err.check(NotWriteableError) or err.check(ChildOfWrongTypeError):
107         msg = _utf8(err.value.args[0])
108         raise SFTPError(FX_PERMISSION_DENIED, msg)
109     if err.check(ExistingChildError):
110         # Versions of SFTP after v3 (which is what twisted.conch implements)
111         # define a specific error code for this case: FX_FILE_ALREADY_EXISTS.
112         # However v3 doesn't; instead, other servers such as sshd return
113         # FX_FAILURE. The gvfs SFTP backend, for example, depends on this
114         # to translate the error to the equivalent of POSIX EEXIST, which is
115         # necessary for some picky programs (such as gedit).
116         msg = _utf8(err.value.args[0])
117         raise SFTPError(FX_FAILURE, msg)
118     if err.check(NotImplementedError):
119         raise SFTPError(FX_OP_UNSUPPORTED, _utf8(err.value))
120     if err.check(EOFError):
121         raise SFTPError(FX_EOF, "end of file reached")
122     if err.check(defer.FirstError):
123         _convert_error(err.value.subFailure, request)
124
125     # We assume that the error message is not anonymity-sensitive.
126     raise SFTPError(FX_FAILURE, _utf8(err.value))
127
128
129 def _repr_flags(flags):
130     return "|".join([f for f in
131                      [(flags & FXF_READ)   and "FXF_READ"   or None,
132                       (flags & FXF_WRITE)  and "FXF_WRITE"  or None,
133                       (flags & FXF_APPEND) and "FXF_APPEND" or None,
134                       (flags & FXF_CREAT)  and "FXF_CREAT"  or None,
135                       (flags & FXF_TRUNC)  and "FXF_TRUNC"  or None,
136                       (flags & FXF_EXCL)   and "FXF_EXCL"   or None,
137                      ]
138                      if f])
139
140
141 def _lsLine(name, attrs):
142     st_uid = "tahoe"
143     st_gid = "tahoe"
144     st_mtime = attrs.get("mtime", 0)
145     st_mode = attrs["permissions"]
146
147     # Some clients won't tolerate '?' in the size field (#1337).
148     st_size = attrs.get("size", 0)
149
150     # We don't know how many links there really are to this object.
151     st_nlink = 1
152
153     # Based on <https://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
154     # We previously could not call the version in Twisted because we needed the change
155     # <https://twistedmatrix.com/trac/changeset/25412> (released in Twisted v8.2).
156     # Since we now depend on Twisted v10.1, consider calling Twisted's version.
157
158     mode = st_mode
159     perms = array.array('c', '-'*10)
160     ft = stat.S_IFMT(mode)
161     if   stat.S_ISDIR(ft): perms[0] = 'd'
162     elif stat.S_ISREG(ft): perms[0] = '-'
163     else: perms[0] = '?'
164     # user
165     if mode&stat.S_IRUSR: perms[1] = 'r'
166     if mode&stat.S_IWUSR: perms[2] = 'w'
167     if mode&stat.S_IXUSR: perms[3] = 'x'
168     # group
169     if mode&stat.S_IRGRP: perms[4] = 'r'
170     if mode&stat.S_IWGRP: perms[5] = 'w'
171     if mode&stat.S_IXGRP: perms[6] = 'x'
172     # other
173     if mode&stat.S_IROTH: perms[7] = 'r'
174     if mode&stat.S_IWOTH: perms[8] = 'w'
175     if mode&stat.S_IXOTH: perms[9] = 'x'
176     # suid/sgid never set
177
178     l = perms.tostring()
179     l += str(st_nlink).rjust(5) + ' '
180     un = str(st_uid)
181     l += un.ljust(9)
182     gr = str(st_gid)
183     l += gr.ljust(9)
184     sz = str(st_size)
185     l += sz.rjust(8)
186     l += ' '
187     day = 60 * 60 * 24
188     sixmo = day * 7 * 26
189     now = time()
190     if st_mtime + sixmo < now or st_mtime > now + day:
191         # mtime is more than 6 months ago, or more than one day in the future
192         l += strftime("%b %d  %Y ", localtime(st_mtime))
193     else:
194         l += strftime("%b %d %H:%M ", localtime(st_mtime))
195     l += name
196     return l
197
198
199 def _no_write(parent_readonly, child, metadata=None):
200     """Whether child should be listed as having read-only permissions in parent."""
201
202     if child.is_unknown():
203         return True
204     elif child.is_mutable():
205         return child.is_readonly()
206     elif parent_readonly or IDirectoryNode.providedBy(child):
207         return True
208     else:
209         return metadata is not None and metadata.get('no-write', False)
210
211
212 def _populate_attrs(childnode, metadata, size=None):
213     attrs = {}
214
215     # The permissions must have the S_IFDIR (040000) or S_IFREG (0100000)
216     # bits, otherwise the client may refuse to open a directory.
217     # Also, sshfs run as a non-root user requires files and directories
218     # to be world-readable/writeable.
219     # It is important that we never set the executable bits on files.
220     #
221     # Directories and unknown nodes have no size, and SFTP doesn't
222     # require us to make one up.
223     #
224     # childnode might be None, meaning that the file doesn't exist yet,
225     # but we're going to write it later.
226
227     if childnode and childnode.is_unknown():
228         perms = 0
229     elif childnode and IDirectoryNode.providedBy(childnode):
230         perms = S_IFDIR | 0777
231     else:
232         # For files, omit the size if we don't immediately know it.
233         if childnode and size is None:
234             size = childnode.get_size()
235         if size is not None:
236             assert isinstance(size, (int, long)) and not isinstance(size, bool), repr(size)
237             attrs['size'] = size
238         perms = S_IFREG | 0666
239
240     if metadata:
241         if metadata.get('no-write', False):
242             perms &= S_IFDIR | S_IFREG | 0555  # clear 'w' bits
243
244         # See webapi.txt for what these times mean.
245         # We would prefer to omit atime, but SFTP version 3 can only
246         # accept mtime if atime is also set.
247         if 'linkmotime' in metadata.get('tahoe', {}):
248             attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
249         elif 'mtime' in metadata:
250             attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['mtime'])
251
252         if 'linkcrtime' in metadata.get('tahoe', {}):
253             attrs['createtime'] = _to_sftp_time(metadata['tahoe']['linkcrtime'])
254
255     attrs['permissions'] = perms
256
257     # twisted.conch.ssh.filetransfer only implements SFTP version 3,
258     # which doesn't include SSH_FILEXFER_ATTR_FLAGS.
259
260     return attrs
261
262
263 def _attrs_to_metadata(attrs):
264     metadata = {}
265
266     for key in attrs:
267         if key == "mtime" or key == "ctime" or key == "createtime":
268             metadata[key] = long(attrs[key])
269         elif key.startswith("ext_"):
270             metadata[key] = str(attrs[key])
271
272     perms = attrs.get('permissions', stat.S_IWUSR)
273     if not (perms & stat.S_IWUSR):
274         metadata['no-write'] = True
275
276     return metadata
277
278
279 def _direntry_for(filenode_or_parent, childname, filenode=None):
280     assert isinstance(childname, (unicode, NoneType)), childname
281
282     if childname is None:
283         filenode_or_parent = filenode
284
285     if filenode_or_parent:
286         rw_uri = filenode_or_parent.get_write_uri()
287         if rw_uri and childname:
288             return rw_uri + "/" + childname.encode('utf-8')
289         else:
290             return rw_uri
291
292     return None
293
294
295 class OverwriteableFileConsumer(PrefixingLogMixin):
296     implements(IFinishableConsumer)
297     """I act both as a consumer for the download of the original file contents, and as a
298     wrapper for a temporary file that records the downloaded data and any overwrites.
299     I use a priority queue to keep track of which regions of the file have been overwritten
300     but not yet downloaded, so that the download does not clobber overwritten data.
301     I use another priority queue to record milestones at which to make callbacks
302     indicating that a given number of bytes have been downloaded.
303
304     The temporary file reflects the contents of the file that I represent, except that:
305      - regions that have neither been downloaded nor overwritten, if present,
306        contain garbage.
307      - the temporary file may be shorter than the represented file (it is never longer).
308        The latter's current size is stored in self.current_size.
309
310     This abstraction is mostly independent of SFTP. Consider moving it, if it is found
311     useful for other frontends."""
312
313     def __init__(self, download_size, tempfile_maker):
314         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
315         if noisy: self.log(".__init__(%r, %r)" % (download_size, tempfile_maker), level=NOISY)
316         self.download_size = download_size
317         self.current_size = download_size
318         self.f = tempfile_maker()
319         self.downloaded = 0
320         self.milestones = []  # empty heap of (offset, d)
321         self.overwrites = []  # empty heap of (start, end)
322         self.is_closed = False
323         self.done = self.when_reached(download_size)  # adds a milestone
324         self.is_done = False
325         def _signal_done(ign):
326             if noisy: self.log("DONE", level=NOISY)
327             self.is_done = True
328         self.done.addCallback(_signal_done)
329         self.producer = None
330
331     def get_file(self):
332         return self.f
333
334     def get_current_size(self):
335         return self.current_size
336
337     def set_current_size(self, size):
338         if noisy: self.log(".set_current_size(%r), current_size = %r, downloaded = %r" %
339                            (size, self.current_size, self.downloaded), level=NOISY)
340         if size < self.current_size or size < self.downloaded:
341             self.f.truncate(size)
342         if size > self.current_size:
343             self.overwrite(self.current_size, "\x00" * (size - self.current_size))
344         self.current_size = size
345
346         # make the invariant self.download_size <= self.current_size be true again
347         if size < self.download_size:
348             self.download_size = size
349
350         if self.downloaded >= self.download_size:
351             self.finish()
352
353     def registerProducer(self, p, streaming):
354         if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
355         if self.producer is not None:
356             raise RuntimeError("producer is already registered")
357
358         self.producer = p
359         if streaming:
360             # call resumeProducing once to start things off
361             p.resumeProducing()
362         else:
363             def _iterate():
364                 if not self.is_done:
365                     p.resumeProducing()
366                     eventually(_iterate)
367             _iterate()
368
369     def write(self, data):
370         if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
371         if self.is_closed:
372             return
373
374         if self.downloaded >= self.download_size:
375             return
376
377         next_downloaded = self.downloaded + len(data)
378         if next_downloaded > self.download_size:
379             data = data[:(self.download_size - self.downloaded)]
380
381         while len(self.overwrites) > 0:
382             (start, end) = self.overwrites[0]
383             if start >= next_downloaded:
384                 # This and all remaining overwrites are after the data we just downloaded.
385                 break
386             if start > self.downloaded:
387                 # The data we just downloaded has been partially overwritten.
388                 # Write the prefix of it that precedes the overwritten region.
389                 self.f.seek(self.downloaded)
390                 self.f.write(data[:(start - self.downloaded)])
391
392             # This merges consecutive overwrites if possible, which allows us to detect the
393             # case where the download can be stopped early because the remaining region
394             # to download has already been fully overwritten.
395             heapq.heappop(self.overwrites)
396             while len(self.overwrites) > 0:
397                 (start1, end1) = self.overwrites[0]
398                 if start1 > end:
399                     break
400                 end = end1
401                 heapq.heappop(self.overwrites)
402
403             if end >= next_downloaded:
404                 # This overwrite extends past the downloaded data, so there is no
405                 # more data to consider on this call.
406                 heapq.heappush(self.overwrites, (next_downloaded, end))
407                 self._update_downloaded(next_downloaded)
408                 return
409             elif end >= self.downloaded:
410                 data = data[(end - self.downloaded):]
411                 self._update_downloaded(end)
412
413         self.f.seek(self.downloaded)
414         self.f.write(data)
415         self._update_downloaded(next_downloaded)
416
417     def _update_downloaded(self, new_downloaded):
418         self.downloaded = new_downloaded
419         milestone = new_downloaded
420         if len(self.overwrites) > 0:
421             (start, end) = self.overwrites[0]
422             if start <= new_downloaded and end > milestone:
423                 milestone = end
424
425         while len(self.milestones) > 0:
426             (next, d) = self.milestones[0]
427             if next > milestone:
428                 return
429             if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
430             heapq.heappop(self.milestones)
431             eventually(d.callback, None)
432
433         if milestone >= self.download_size:
434             self.finish()
435
436     def overwrite(self, offset, data):
437         if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
438         if offset > self.current_size:
439             # Normally writing at an offset beyond the current end-of-file
440             # would leave a hole that appears filled with zeroes. However, an
441             # EncryptedTemporaryFile doesn't behave like that (if there is a
442             # hole in the file on disk, the zeroes that are read back will be
443             # XORed with the keystream). So we must explicitly write zeroes in
444             # the gap between the current EOF and the offset.
445
446             self.f.seek(self.current_size)
447             self.f.write("\x00" * (offset - self.current_size))
448             start = self.current_size
449         else:
450             self.f.seek(offset)
451             start = offset
452
453         self.f.write(data)
454         end = offset + len(data)
455         self.current_size = max(self.current_size, end)
456         if end > self.downloaded:
457             heapq.heappush(self.overwrites, (start, end))
458
459     def read(self, offset, length):
460         """When the data has been read, callback the Deferred that we return with this data.
461         Otherwise errback the Deferred that we return.
462         The caller must perform no more overwrites until the Deferred has fired."""
463
464         if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
465
466         # Note that the overwrite method is synchronous. When a write request is processed
467         # (e.g. a writeChunk request on the async queue of GeneralSFTPFile), overwrite will
468         # be called and will update self.current_size if necessary before returning. Therefore,
469         # self.current_size will be up-to-date for a subsequent call to this read method, and
470         # so it is correct to do the check for a read past the end-of-file here.
471         if offset >= self.current_size:
472             def _eof(): raise EOFError("read past end of file")
473             return defer.execute(_eof)
474
475         if offset + length > self.current_size:
476             length = self.current_size - offset
477             if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
478
479         needed = min(offset + length, self.download_size)
480         d = self.when_reached(needed)
481         def _reached(ign):
482             # It is not necessarily the case that self.downloaded >= needed, because
483             # the file might have been truncated (thus truncating the download) and
484             # then extended.
485
486             assert self.current_size >= offset + length, (self.current_size, offset, length)
487             if noisy: self.log("self.f = %r" % (self.f,), level=NOISY)
488             self.f.seek(offset)
489             return self.f.read(length)
490         d.addCallback(_reached)
491         return d
492
493     def when_reached(self, index):
494         if noisy: self.log(".when_reached(%r)" % (index,), level=NOISY)
495         if index <= self.downloaded:  # already reached
496             if noisy: self.log("already reached %r" % (index,), level=NOISY)
497             return defer.succeed(None)
498         d = defer.Deferred()
499         def _reached(ign):
500             if noisy: self.log("reached %r" % (index,), level=NOISY)
501             return ign
502         d.addCallback(_reached)
503         heapq.heappush(self.milestones, (index, d))
504         return d
505
506     def when_done(self):
507         return self.done
508
509     def finish(self):
510         """Called by the producer when it has finished producing, or when we have
511         received enough bytes, or as a result of a close. Defined by IFinishableConsumer."""
512
513         while len(self.milestones) > 0:
514             (next, d) = self.milestones[0]
515             if noisy: self.log("MILESTONE FINISH %r %r" % (next, d), level=NOISY)
516             heapq.heappop(self.milestones)
517             # The callback means that the milestone has been reached if
518             # it is ever going to be. Note that the file may have been
519             # truncated to before the milestone.
520             eventually(d.callback, None)
521
522     def close(self):
523         if not self.is_closed:
524             self.is_closed = True
525             try:
526                 self.f.close()
527             except Exception, e:
528                 self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
529         self.finish()
530
531     def unregisterProducer(self):
532         pass
533
534
535 SIZE_THRESHOLD = 1000
536
537
538 class ShortReadOnlySFTPFile(PrefixingLogMixin):
539     implements(ISFTPFile)
540     """I represent a file handle to a particular file on an SFTP connection.
541     I am used only for short immutable files opened in read-only mode.
542     When I am created, the file contents start to be downloaded to memory.
543     self.async is used to delay read requests until the download has finished."""
544
545     def __init__(self, userpath, filenode, metadata):
546         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
547         if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
548
549         assert isinstance(userpath, str) and IFileNode.providedBy(filenode), (userpath, filenode)
550         self.filenode = filenode
551         self.metadata = metadata
552         self.async = download_to_data(filenode)
553         self.closed = False
554
555     def readChunk(self, offset, length):
556         request = ".readChunk(%r, %r)" % (offset, length)
557         self.log(request, level=OPERATIONAL)
558
559         if self.closed:
560             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
561             return defer.execute(_closed)
562
563         d = defer.Deferred()
564         def _read(data):
565             if noisy: self.log("_read(<data of length %r>) in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY)
566
567             # "In response to this request, the server will read as many bytes as it
568             #  can from the file (up to 'len'), and return them in a SSH_FXP_DATA
569             #  message.  If an error occurs or EOF is encountered before reading any
570             #  data, the server will respond with SSH_FXP_STATUS.  For normal disk
571             #  files, it is guaranteed that this will read the specified number of
572             #  bytes, or up to end of file."
573             #
574             # i.e. we respond with an EOF error iff offset is already at EOF.
575
576             if offset >= len(data):
577                 eventually(d.errback, SFTPError(FX_EOF, "read at or past end of file"))
578             else:
579                 eventually(d.callback, data[offset:offset+length])  # truncated if offset+length > len(data)
580             return data
581         self.async.addCallbacks(_read, eventually_errback(d))
582         d.addBoth(_convert_error, request)
583         return d
584
585     def writeChunk(self, offset, data):
586         self.log(".writeChunk(%r, <data of length %r>) denied" % (offset, len(data)), level=OPERATIONAL)
587
588         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
589         return defer.execute(_denied)
590
591     def close(self):
592         self.log(".close()", level=OPERATIONAL)
593
594         self.closed = True
595         return defer.succeed(None)
596
597     def getAttrs(self):
598         request = ".getAttrs()"
599         self.log(request, level=OPERATIONAL)
600
601         if self.closed:
602             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
603             return defer.execute(_closed)
604
605         d = defer.execute(_populate_attrs, self.filenode, self.metadata)
606         d.addBoth(_convert_error, request)
607         return d
608
609     def setAttrs(self, attrs):
610         self.log(".setAttrs(%r) denied" % (attrs,), level=OPERATIONAL)
611         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
612         return defer.execute(_denied)
613
614
615 class GeneralSFTPFile(PrefixingLogMixin):
616     implements(ISFTPFile)
617     """I represent a file handle to a particular file on an SFTP connection.
618     I wrap an instance of OverwriteableFileConsumer, which is responsible for
619     storing the file contents. In order to allow write requests to be satisfied
620     immediately, there is effectively a FIFO queue between requests made to this
621     file handle, and requests to my OverwriteableFileConsumer. This queue is
622     implemented by the callback chain of self.async.
623
624     When first constructed, I am in an 'unopened' state that causes most
625     operations to be delayed until 'open' is called."""
626
627     def __init__(self, userpath, flags, close_notify, convergence):
628         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
629         if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
630                            (userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
631
632         assert isinstance(userpath, str), userpath
633         self.userpath = userpath
634         self.flags = flags
635         self.close_notify = close_notify
636         self.convergence = convergence
637         self.async = defer.Deferred()
638         # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
639         self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
640         self.closed = False
641         self.abandoned = False
642         self.parent = None
643         self.childname = None
644         self.filenode = None
645         self.metadata = None
646
647         # self.consumer should only be relied on in callbacks for self.async, since it might
648         # not be set before then.
649         self.consumer = None
650
651     def open(self, parent=None, childname=None, filenode=None, metadata=None):
652         self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
653                  (parent, childname, filenode, metadata), level=OPERATIONAL)
654
655         assert isinstance(childname, (unicode, NoneType)), childname
656         # If the file has been renamed, the new (parent, childname) takes precedence.
657         if self.parent is None:
658             self.parent = parent
659         if self.childname is None:
660             self.childname = childname
661         self.filenode = filenode
662         self.metadata = metadata
663
664         assert not self.closed, self
665         tempfile_maker = EncryptedTemporaryFile
666
667         if (self.flags & FXF_TRUNC) or not filenode:
668             # We're either truncating or creating the file, so we don't need the old contents.
669             self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
670             self.consumer.finish()
671         else:
672             assert IFileNode.providedBy(filenode), filenode
673
674             self.async.addCallback(lambda ignored: filenode.get_best_readable_version())
675
676             def _read(version):
677                 if noisy: self.log("_read", level=NOISY)
678                 download_size = version.get_size()
679                 assert download_size is not None
680
681                 self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
682
683                 version.read(self.consumer, 0, None)
684             self.async.addCallback(_read)
685
686         eventually(self.async.callback, None)
687
688         if noisy: self.log("open done", level=NOISY)
689         return self
690
691     def get_userpath(self):
692         return self.userpath
693
694     def get_direntry(self):
695         return _direntry_for(self.parent, self.childname)
696
697     def rename(self, new_userpath, new_parent, new_childname):
698         self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
699
700         assert isinstance(new_userpath, str) and isinstance(new_childname, unicode), (new_userpath, new_childname)
701         self.userpath = new_userpath
702         self.parent = new_parent
703         self.childname = new_childname
704
705     def abandon(self):
706         self.log(".abandon()", level=OPERATIONAL)
707
708         self.abandoned = True
709
710     def sync(self, ign=None):
711         # The ign argument allows some_file.sync to be used as a callback.
712         self.log(".sync()", level=OPERATIONAL)
713
714         d = defer.Deferred()
715         self.async.addBoth(eventually_callback(d))
716         def _done(res):
717             if noisy: self.log("_done(%r) in .sync()" % (res,), level=NOISY)
718             return res
719         d.addBoth(_done)
720         return d
721
722     def readChunk(self, offset, length):
723         request = ".readChunk(%r, %r)" % (offset, length)
724         self.log(request, level=OPERATIONAL)
725
726         if not (self.flags & FXF_READ):
727             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for reading")
728             return defer.execute(_denied)
729
730         if self.closed:
731             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
732             return defer.execute(_closed)
733
734         d = defer.Deferred()
735         def _read(ign):
736             if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
737             d2 = self.consumer.read(offset, length)
738             d2.addCallbacks(eventually_callback(d), eventually_errback(d))
739             # It is correct to drop d2 here.
740             return None
741         self.async.addCallbacks(_read, eventually_errback(d))
742         d.addBoth(_convert_error, request)
743         return d
744
745     def writeChunk(self, offset, data):
746         self.log(".writeChunk(%r, <data of length %r>)" % (offset, len(data)), level=OPERATIONAL)
747
748         if not (self.flags & FXF_WRITE):
749             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
750             return defer.execute(_denied)
751
752         if self.closed:
753             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
754             return defer.execute(_closed)
755
756         self.has_changed = True
757
758         # Note that we return without waiting for the write to occur. Reads and
759         # close wait for prior writes, and will fail if any prior operation failed.
760         # This is ok because SFTP makes no guarantee that the write completes
761         # before the request does. In fact it explicitly allows write errors to be
762         # delayed until close:
763         #   "One should note that on some server platforms even a close can fail.
764         #    This can happen e.g. if the server operating system caches writes,
765         #    and an error occurs while flushing cached writes during the close."
766
767         def _write(ign):
768             if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
769                                (offset, len(data), self.consumer.get_current_size()), level=NOISY)
770             # FXF_APPEND means that we should always write at the current end of file.
771             write_offset = offset
772             if self.flags & FXF_APPEND:
773                 write_offset = self.consumer.get_current_size()
774
775             self.consumer.overwrite(write_offset, data)
776             if noisy: self.log("overwrite done", level=NOISY)
777             return None
778         self.async.addCallback(_write)
779         # don't addErrback to self.async, just allow subsequent async ops to fail.
780         return defer.succeed(None)
781
782     def close(self):
783         request = ".close()"
784         self.log(request, level=OPERATIONAL)
785
786         if self.closed:
787             return defer.succeed(None)
788
789         # This means that close has been called, not that the close has succeeded.
790         self.closed = True
791
792         if not (self.flags & (FXF_WRITE | FXF_CREAT)):
793             def _readonly_close():
794                 if self.consumer:
795                     self.consumer.close()
796             return defer.execute(_readonly_close)
797
798         # We must capture the abandoned, parent, and childname variables synchronously
799         # at the close call. This is needed by the correctness arguments in the comments
800         # for _abandon_any_heisenfiles and _rename_heisenfiles.
801         # Note that the file must have been opened before it can be closed.
802         abandoned = self.abandoned
803         parent = self.parent
804         childname = self.childname
805
806         # has_changed is set when writeChunk is called, not when the write occurs, so
807         # it is correct to optimize out the commit if it is False at the close call.
808         has_changed = self.has_changed
809
810         def _committed(res):
811             if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
812
813             self.consumer.close()
814
815             # We must close_notify before re-firing self.async.
816             if self.close_notify:
817                 self.close_notify(self.userpath, self.parent, self.childname, self)
818             return res
819
820         def _close(ign):
821             d2 = self.consumer.when_done()
822             if self.filenode and self.filenode.is_mutable():
823                 self.log("update mutable file %r childname=%r metadata=%r" % (self.filenode, childname, self.metadata), level=OPERATIONAL)
824                 if self.metadata.get('no-write', False) and not self.filenode.is_readonly():
825                     assert parent and childname, (parent, childname, self.metadata)
826                     d2.addCallback(lambda ign: parent.set_metadata_for(childname, self.metadata))
827
828                 d2.addCallback(lambda ign: self.filenode.overwrite(MutableFileHandle(self.consumer.get_file())))
829             else:
830                 def _add_file(ign):
831                     self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
832                     u = FileHandle(self.consumer.get_file(), self.convergence)
833                     return parent.add_file(childname, u, metadata=self.metadata)
834                 d2.addCallback(_add_file)
835
836             d2.addBoth(_committed)
837             return d2
838
839         d = defer.Deferred()
840
841         # If the file has been abandoned, we don't want the close operation to get "stuck",
842         # even if self.async fails to re-fire. Doing the close independently of self.async
843         # in that case ensures that dropping an ssh connection is sufficient to abandon
844         # any heisenfiles that were not explicitly closed in that connection.
845         if abandoned or not has_changed:
846             d.addCallback(_committed)
847         else:
848             self.async.addCallback(_close)
849
850         self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
851         d.addBoth(_convert_error, request)
852         return d
853
854     def getAttrs(self):
855         request = ".getAttrs()"
856         self.log(request, level=OPERATIONAL)
857
858         if self.closed:
859             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
860             return defer.execute(_closed)
861
862         # Optimization for read-only handles, when we already know the metadata.
863         if not (self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
864             return defer.succeed(_populate_attrs(self.filenode, self.metadata))
865
866         d = defer.Deferred()
867         def _get(ign):
868             if noisy: self.log("_get(%r) in %r, filenode = %r, metadata = %r" % (ign, request, self.filenode, self.metadata), level=NOISY)
869
870             # self.filenode might be None, but that's ok.
871             attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
872             eventually(d.callback, attrs)
873             return None
874         self.async.addCallbacks(_get, eventually_errback(d))
875         d.addBoth(_convert_error, request)
876         return d
877
878     def setAttrs(self, attrs, only_if_at=None):
879         request = ".setAttrs(%r, only_if_at=%r)" % (attrs, only_if_at)
880         self.log(request, level=OPERATIONAL)
881
882         if not (self.flags & FXF_WRITE):
883             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
884             return defer.execute(_denied)
885
886         if self.closed:
887             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
888             return defer.execute(_closed)
889
890         size = attrs.get("size", None)
891         if size is not None and (not isinstance(size, (int, long)) or size < 0):
892             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
893             return defer.execute(_bad)
894
895         d = defer.Deferred()
896         def _set(ign):
897             if noisy: self.log("_set(%r) in %r" % (ign, request), level=NOISY)
898             current_direntry = _direntry_for(self.parent, self.childname, self.filenode)
899             if only_if_at and only_if_at != current_direntry:
900                 if noisy: self.log("not setting attributes: current_direntry=%r in %r" %
901                                    (current_direntry, request), level=NOISY)
902                 return None
903
904             now = time()
905             self.metadata = update_metadata(self.metadata, _attrs_to_metadata(attrs), now)
906             if size is not None:
907                 # TODO: should we refuse to truncate a file opened with FXF_APPEND?
908                 # <http://allmydata.org/trac/tahoe-lafs/ticket/1037#comment:20>
909                 self.consumer.set_current_size(size)
910             eventually(d.callback, None)
911             return None
912         self.async.addCallbacks(_set, eventually_errback(d))
913         d.addBoth(_convert_error, request)
914         return d
915
916
917 class StoppableList:
918     def __init__(self, items):
919         self.items = items
920     def __iter__(self):
921         for i in self.items:
922             yield i
923     def close(self):
924         pass
925
926
927 class Reason:
928     def __init__(self, value):
929         self.value = value
930
931
932 # A "heisenfile" is a file that has been opened with write flags
933 # (FXF_WRITE and/or FXF_CREAT) and not yet close-notified.
934 # 'all_heisenfiles' maps from a direntry string to a list of
935 # GeneralSFTPFile.
936 #
937 # A direntry string is parent_write_uri + "/" + childname_utf8 for
938 # an immutable file, or file_write_uri for a mutable file.
939 # Updates to this dict are single-threaded.
940
941 all_heisenfiles = {}
942
943 def _reload():
944     global all_heisenfiles
945     all_heisenfiles = {}
946
947 class SFTPUserHandler(ConchUser, PrefixingLogMixin):
948     implements(ISFTPServer)
949     def __init__(self, client, rootnode, username):
950         ConchUser.__init__(self)
951         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=username)
952         if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
953
954         self.channelLookup["session"] = session.SSHSession
955         self.subsystemLookup["sftp"] = FileTransferServer
956
957         self._client = client
958         self._root = rootnode
959         self._username = username
960         self._convergence = client.convergence
961
962         # maps from UTF-8 paths for this user, to files written and still open
963         self._heisenfiles = {}
964
965     def gotVersion(self, otherVersion, extData):
966         self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
967
968         # advertise the same extensions as the OpenSSH SFTP server
969         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
970         return {'posix-rename@openssh.com': '1',
971                 'statvfs@openssh.com': '2',
972                 'fstatvfs@openssh.com': '2',
973                }
974
975     def logout(self):
976         self.log(".logout()", level=OPERATIONAL)
977
978         for files in self._heisenfiles.itervalues():
979             for f in files:
980                 f.abandon()
981
982     def _add_heisenfile_by_path(self, file):
983         self.log("._add_heisenfile_by_path(%r)" % (file,), level=OPERATIONAL)
984
985         userpath = file.get_userpath()
986         if userpath in self._heisenfiles:
987             self._heisenfiles[userpath] += [file]
988         else:
989             self._heisenfiles[userpath] = [file]
990
991     def _add_heisenfile_by_direntry(self, file):
992         self.log("._add_heisenfile_by_direntry(%r)" % (file,), level=OPERATIONAL)
993
994         direntry = file.get_direntry()
995         if direntry:
996             if direntry in all_heisenfiles:
997                 all_heisenfiles[direntry] += [file]
998             else:
999                 all_heisenfiles[direntry] = [file]
1000
1001     def _abandon_any_heisenfiles(self, userpath, direntry):
1002         request = "._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry)
1003         self.log(request, level=OPERATIONAL)
1004
1005         assert isinstance(userpath, str), userpath
1006
1007         # First we synchronously mark all heisenfiles matching the userpath or direntry
1008         # as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
1009         # each file that we abandoned.
1010         #
1011         # For each file, the call to .abandon() occurs:
1012         #   * before the file is closed, in which case it will never be committed
1013         #     (uploaded+linked or published); or
1014         #   * after it is closed but before it has been close_notified, in which case the
1015         #     .sync() ensures that it has been committed (successfully or not) before we
1016         #     return.
1017         #
1018         # This avoids a race that might otherwise cause the file to be committed after
1019         # the remove operation has completed.
1020         #
1021         # We return a Deferred that fires with True if any files were abandoned (this
1022         # does not mean that they were not committed; it is used to determine whether
1023         # a NoSuchChildError from the attempt to delete the file should be suppressed).
1024
1025         files = []
1026         if direntry in all_heisenfiles:
1027             files = all_heisenfiles[direntry]
1028             del all_heisenfiles[direntry]
1029         if userpath in self._heisenfiles:
1030             files += self._heisenfiles[userpath]
1031             del self._heisenfiles[userpath]
1032
1033         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1034
1035         for f in files:
1036             f.abandon()
1037
1038         d = defer.succeed(None)
1039         for f in files:
1040             d.addBoth(f.sync)
1041
1042         def _done(ign):
1043             self.log("done %r" % (request,), level=OPERATIONAL)
1044             return len(files) > 0
1045         d.addBoth(_done)
1046         return d
1047
1048     def _rename_heisenfiles(self, from_userpath, from_parent, from_childname,
1049                             to_userpath, to_parent, to_childname, overwrite=True):
1050         request = ("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" %
1051                    (from_userpath, from_parent, from_childname, to_userpath, to_parent, to_childname, overwrite))
1052         self.log(request, level=OPERATIONAL)
1053
1054         assert (isinstance(from_userpath, str) and isinstance(from_childname, unicode) and
1055                 isinstance(to_userpath, str) and isinstance(to_childname, unicode)), \
1056                (from_userpath, from_childname, to_userpath, to_childname)
1057
1058         if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1059
1060         # First we synchronously rename all heisenfiles matching the userpath or direntry.
1061         # Then we .sync() each file that we renamed.
1062         #
1063         # For each file, the call to .rename occurs:
1064         #   * before the file is closed, in which case it will be committed at the
1065         #     new direntry; or
1066         #   * after it is closed but before it has been close_notified, in which case the
1067         #     .sync() ensures that it has been committed (successfully or not) before we
1068         #     return.
1069         #
1070         # This avoids a race that might otherwise cause the file to be committed at the
1071         # old name after the rename operation has completed.
1072         #
1073         # Note that if overwrite is False, the caller should already have checked
1074         # whether a real direntry exists at the destination. It is possible that another
1075         # direntry (heisen or real) comes to exist at the destination after that check,
1076         # but in that case it is correct for the rename to succeed (and for the commit
1077         # of the heisenfile at the destination to possibly clobber the other entry, since
1078         # that can happen anyway when we have concurrent write handles to the same direntry).
1079         #
1080         # We return a Deferred that fires with True if any files were renamed (this
1081         # does not mean that they were not committed; it is used to determine whether
1082         # a NoSuchChildError from the rename attempt should be suppressed). If overwrite
1083         # is False and there were already heisenfiles at the destination userpath or
1084         # direntry, we return a Deferred that fails with SFTPError(FX_PERMISSION_DENIED).
1085
1086         from_direntry = _direntry_for(from_parent, from_childname)
1087         to_direntry = _direntry_for(to_parent, to_childname)
1088
1089         if noisy: self.log("from_direntry = %r, to_direntry = %r, len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
1090                            (from_direntry, to_direntry, len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
1091
1092         if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
1093             def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1094             if noisy: self.log("existing", level=NOISY)
1095             return defer.execute(_existing)
1096
1097         from_files = []
1098         if from_direntry in all_heisenfiles:
1099             from_files = all_heisenfiles[from_direntry]
1100             del all_heisenfiles[from_direntry]
1101         if from_userpath in self._heisenfiles:
1102             from_files += self._heisenfiles[from_userpath]
1103             del self._heisenfiles[from_userpath]
1104
1105         if noisy: self.log("from_files = %r in %r" % (from_files, request), level=NOISY)
1106
1107         for f in from_files:
1108             f.rename(to_userpath, to_parent, to_childname)
1109             self._add_heisenfile_by_path(f)
1110             self._add_heisenfile_by_direntry(f)
1111
1112         d = defer.succeed(None)
1113         for f in from_files:
1114             d.addBoth(f.sync)
1115
1116         def _done(ign):
1117             if noisy: self.log("done: len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
1118                                (len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
1119             return len(from_files) > 0
1120         d.addBoth(_done)
1121         return d
1122
1123     def _update_attrs_for_heisenfiles(self, userpath, direntry, attrs):
1124         request = "._update_attrs_for_heisenfiles(%r, %r, %r)" % (userpath, direntry, attrs)
1125         self.log(request, level=OPERATIONAL)
1126
1127         assert isinstance(userpath, str) and isinstance(direntry, str), (userpath, direntry)
1128
1129         files = []
1130         if direntry in all_heisenfiles:
1131             files = all_heisenfiles[direntry]
1132         if userpath in self._heisenfiles:
1133             files += self._heisenfiles[userpath]
1134
1135         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1136
1137         # We set the metadata for all heisenfiles at this path or direntry.
1138         # Since a direntry includes a write URI, we must have authority to
1139         # change the metadata of heisenfiles found in the all_heisenfiles dict.
1140         # However that's not necessarily the case for heisenfiles found by
1141         # path. Therefore we tell the setAttrs method of each file to only
1142         # perform the update if the file is at the correct direntry.
1143
1144         d = defer.succeed(None)
1145         for f in files:
1146             d.addBoth(f.setAttrs, attrs, only_if_at=direntry)
1147
1148         def _done(ign):
1149             self.log("done %r" % (request,), level=OPERATIONAL)
1150             # TODO: this should not return True if only_if_at caused all files to be skipped.
1151             return len(files) > 0
1152         d.addBoth(_done)
1153         return d
1154
1155     def _sync_heisenfiles(self, userpath, direntry, ignore=None):
1156         request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
1157         self.log(request, level=OPERATIONAL)
1158
1159         assert isinstance(userpath, str) and isinstance(direntry, (str, NoneType)), (userpath, direntry)
1160
1161         files = []
1162         if direntry in all_heisenfiles:
1163             files = all_heisenfiles[direntry]
1164         if userpath in self._heisenfiles:
1165             files += self._heisenfiles[userpath]
1166
1167         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1168
1169         d = defer.succeed(None)
1170         for f in files:
1171             if f is not ignore:
1172                 d.addBoth(f.sync)
1173
1174         def _done(ign):
1175             self.log("done %r" % (request,), level=OPERATIONAL)
1176             return None
1177         d.addBoth(_done)
1178         return d
1179
1180     def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
1181         if noisy: self.log("._remove_heisenfile(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
1182
1183         assert isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)), (userpath, childname)
1184
1185         direntry = _direntry_for(parent, childname)
1186         if direntry in all_heisenfiles:
1187             all_old_files = all_heisenfiles[direntry]
1188             all_new_files = [f for f in all_old_files if f is not file_to_remove]
1189             if len(all_new_files) > 0:
1190                 all_heisenfiles[direntry] = all_new_files
1191             else:
1192                 del all_heisenfiles[direntry]
1193
1194         if userpath in self._heisenfiles:
1195             old_files = self._heisenfiles[userpath]
1196             new_files = [f for f in old_files if f is not file_to_remove]
1197             if len(new_files) > 0:
1198                 self._heisenfiles[userpath] = new_files
1199             else:
1200                 del self._heisenfiles[userpath]
1201
1202         if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1203
1204     def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
1205         if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
1206                            (existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
1207                            level=NOISY)
1208
1209         assert (isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)) and
1210                 (metadata is None or 'no-write' in metadata)), (userpath, childname, metadata)
1211
1212         writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
1213         direntry = _direntry_for(parent, childname, filenode)
1214
1215         d = self._sync_heisenfiles(userpath, direntry, ignore=existing_file)
1216
1217         if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
1218             d.addCallback(lambda ign: ShortReadOnlySFTPFile(userpath, filenode, metadata))
1219         else:
1220             close_notify = None
1221             if writing:
1222                 close_notify = self._remove_heisenfile
1223
1224             d.addCallback(lambda ign: existing_file or GeneralSFTPFile(userpath, flags, close_notify, self._convergence))
1225             def _got_file(file):
1226                 file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
1227                 if writing:
1228                     self._add_heisenfile_by_direntry(file)
1229                 return file
1230             d.addCallback(_got_file)
1231         return d
1232
1233     def openFile(self, pathstring, flags, attrs, delay=None):
1234         request = ".openFile(%r, %r = %r, %r, delay=%r)" % (pathstring, flags, _repr_flags(flags), attrs, delay)
1235         self.log(request, level=OPERATIONAL)
1236
1237         # This is used for both reading and writing.
1238         # First exclude invalid combinations of flags, and empty paths.
1239
1240         if not (flags & (FXF_READ | FXF_WRITE)):
1241             def _bad_readwrite():
1242                 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
1243             return defer.execute(_bad_readwrite)
1244
1245         if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
1246             def _bad_exclcreat():
1247                 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
1248             return defer.execute(_bad_exclcreat)
1249
1250         path = self._path_from_string(pathstring)
1251         if not path:
1252             def _emptypath(): raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
1253             return defer.execute(_emptypath)
1254
1255         # The combination of flags is potentially valid.
1256
1257         # To work around clients that have race condition bugs, a getAttr, rename, or
1258         # remove request following an 'open' request with FXF_WRITE or FXF_CREAT flags,
1259         # should succeed even if the 'open' request has not yet completed. So we now
1260         # synchronously add a file object into the self._heisenfiles dict, indexed
1261         # by its UTF-8 userpath. (We can't yet add it to the all_heisenfiles dict,
1262         # because we don't yet have a user-independent path for the file.) The file
1263         # object does not know its filenode, parent, or childname at this point.
1264
1265         userpath = self._path_to_utf8(path)
1266
1267         if flags & (FXF_WRITE | FXF_CREAT):
1268             file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
1269             self._add_heisenfile_by_path(file)
1270         else:
1271             # We haven't decided which file implementation to use yet.
1272             file = None
1273
1274         desired_metadata = _attrs_to_metadata(attrs)
1275
1276         # Now there are two major cases:
1277         #
1278         #  1. The path is specified as /uri/FILECAP, with no parent directory.
1279         #     If the FILECAP is mutable and writeable, then we can open it in write-only
1280         #     or read/write mode (non-exclusively), otherwise we can only open it in
1281         #     read-only mode. The open should succeed immediately as long as FILECAP is
1282         #     a valid known filecap that grants the required permission.
1283         #
1284         #  2. The path is specified relative to a parent. We find the parent dirnode and
1285         #     get the child's URI and metadata if it exists. There are four subcases:
1286         #       a. the child does not exist: FXF_CREAT must be set, and we must be able
1287         #          to write to the parent directory.
1288         #       b. the child exists but is not a valid known filecap: fail
1289         #       c. the child is mutable: if we are trying to open it write-only or
1290         #          read/write, then we must be able to write to the file.
1291         #       d. the child is immutable: if we are trying to open it write-only or
1292         #          read/write, then we must be able to write to the parent directory.
1293         #
1294         # To reduce latency, open normally succeeds as soon as these conditions are
1295         # met, even though there might be a failure in downloading the existing file
1296         # or uploading a new one. However, there is an exception: if a file has been
1297         # written, then closed, and is now being reopened, then we have to delay the
1298         # open until the previous upload/publish has completed. This is necessary
1299         # because sshfs does not wait for the result of an FXF_CLOSE message before
1300         # reporting to the client that a file has been closed. It applies both to
1301         # mutable files, and to directory entries linked to an immutable file.
1302         #
1303         # Note that the permission checks below are for more precise error reporting on
1304         # the open call; later operations would fail even if we did not make these checks.
1305
1306         d = delay or defer.succeed(None)
1307         d.addCallback(lambda ign: self._get_root(path))
1308         def _got_root( (root, path) ):
1309             if root.is_unknown():
1310                 raise SFTPError(FX_PERMISSION_DENIED,
1311                                 "cannot open an unknown cap (or child of an unknown object). "
1312                                 "Upgrading the gateway to a later Tahoe-LAFS version may help")
1313             if not path:
1314                 # case 1
1315                 if noisy: self.log("case 1: root = %r, path[:-1] = %r" % (root, path[:-1]), level=NOISY)
1316                 if not IFileNode.providedBy(root):
1317                     raise SFTPError(FX_PERMISSION_DENIED,
1318                                     "cannot open a directory cap")
1319                 if (flags & FXF_WRITE) and root.is_readonly():
1320                     raise SFTPError(FX_PERMISSION_DENIED,
1321                                     "cannot write to a non-writeable filecap without a parent directory")
1322                 if flags & FXF_EXCL:
1323                     raise SFTPError(FX_FAILURE,
1324                                     "cannot create a file exclusively when it already exists")
1325
1326                 # The file does not need to be added to all_heisenfiles, because it is not
1327                 # associated with a directory entry that needs to be updated.
1328
1329                 metadata = update_metadata(None, desired_metadata, time())
1330
1331                 # We have to decide what to pass for the 'parent_readonly' argument to _no_write,
1332                 # given that we don't actually have a parent. This only affects the permissions
1333                 # reported by a getAttrs on this file handle in the case of an immutable file.
1334                 # We choose 'parent_readonly=True' since that will cause the permissions to be
1335                 # reported as r--r--r--, which is appropriate because an immutable file can't be
1336                 # written via this path.
1337
1338                 metadata['no-write'] = _no_write(True, root)
1339                 return self._make_file(file, userpath, flags, filenode=root, metadata=metadata)
1340             else:
1341                 # case 2
1342                 childname = path[-1]
1343
1344                 if noisy: self.log("case 2: root = %r, childname = %r, desired_metadata = %r, path[:-1] = %r" %
1345                                    (root, childname, desired_metadata, path[:-1]), level=NOISY)
1346                 d2 = root.get_child_at_path(path[:-1])
1347                 def _got_parent(parent):
1348                     if noisy: self.log("_got_parent(%r)" % (parent,), level=NOISY)
1349                     if parent.is_unknown():
1350                         raise SFTPError(FX_PERMISSION_DENIED,
1351                                         "cannot open a child of an unknown object. "
1352                                         "Upgrading the gateway to a later Tahoe-LAFS version may help")
1353
1354                     parent_readonly = parent.is_readonly()
1355                     d3 = defer.succeed(None)
1356                     if flags & FXF_EXCL:
1357                         # FXF_EXCL means that the link to the file (not the file itself) must
1358                         # be created atomically wrt updates by this storage client.
1359                         # That is, we need to create the link before returning success to the
1360                         # SFTP open request (and not just on close, as would normally be the
1361                         # case). We make the link initially point to a zero-length LIT file,
1362                         # which is consistent with what might happen on a POSIX filesystem.
1363
1364                         if parent_readonly:
1365                             raise SFTPError(FX_FAILURE,
1366                                             "cannot create a file exclusively when the parent directory is read-only")
1367
1368                         # 'overwrite=False' ensures failure if the link already exists.
1369                         # FIXME: should use a single call to set_uri and return (child, metadata) (#1035)
1370
1371                         zero_length_lit = "URI:LIT:"
1372                         if noisy: self.log("%r.set_uri(%r, None, readcap=%r, overwrite=False)" %
1373                                            (parent, zero_length_lit, childname), level=NOISY)
1374                         d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit,
1375                                                                   metadata=desired_metadata, overwrite=False))
1376                         def _seturi_done(child):
1377                             if noisy: self.log("%r.get_metadata_for(%r)" % (parent, childname), level=NOISY)
1378                             d4 = parent.get_metadata_for(childname)
1379                             d4.addCallback(lambda metadata: (child, metadata))
1380                             return d4
1381                         d3.addCallback(_seturi_done)
1382                     else:
1383                         if noisy: self.log("%r.get_child_and_metadata(%r)" % (parent, childname), level=NOISY)
1384                         d3.addCallback(lambda ign: parent.get_child_and_metadata(childname))
1385
1386                     def _got_child( (filenode, current_metadata) ):
1387                         if noisy: self.log("_got_child( (%r, %r) )" % (filenode, current_metadata), level=NOISY)
1388
1389                         metadata = update_metadata(current_metadata, desired_metadata, time())
1390
1391                         # Ignore the permissions of the desired_metadata in an open call. The permissions
1392                         # can only be set by setAttrs.
1393                         metadata['no-write'] = _no_write(parent_readonly, filenode, current_metadata)
1394
1395                         if filenode.is_unknown():
1396                             raise SFTPError(FX_PERMISSION_DENIED,
1397                                             "cannot open an unknown cap. Upgrading the gateway "
1398                                             "to a later Tahoe-LAFS version may help")
1399                         if not IFileNode.providedBy(filenode):
1400                             raise SFTPError(FX_PERMISSION_DENIED,
1401                                             "cannot open a directory as if it were a file")
1402                         if (flags & FXF_WRITE) and metadata['no-write']:
1403                             raise SFTPError(FX_PERMISSION_DENIED,
1404                                             "cannot open a non-writeable file for writing")
1405
1406                         return self._make_file(file, userpath, flags, parent=parent, childname=childname,
1407                                                filenode=filenode, metadata=metadata)
1408                     def _no_child(f):
1409                         if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
1410                         f.trap(NoSuchChildError)
1411
1412                         if not (flags & FXF_CREAT):
1413                             raise SFTPError(FX_NO_SUCH_FILE,
1414                                             "the file does not exist, and was not opened with the creation (CREAT) flag")
1415                         if parent_readonly:
1416                             raise SFTPError(FX_PERMISSION_DENIED,
1417                                             "cannot create a file when the parent directory is read-only")
1418
1419                         return self._make_file(file, userpath, flags, parent=parent, childname=childname)
1420                     d3.addCallbacks(_got_child, _no_child)
1421                     return d3
1422
1423                 d2.addCallback(_got_parent)
1424                 return d2
1425
1426         d.addCallback(_got_root)
1427         def _remove_on_error(err):
1428             if file:
1429                 self._remove_heisenfile(userpath, None, None, file)
1430             return err
1431         d.addErrback(_remove_on_error)
1432         d.addBoth(_convert_error, request)
1433         return d
1434
1435     def renameFile(self, from_pathstring, to_pathstring, overwrite=False):
1436         request = ".renameFile(%r, %r)" % (from_pathstring, to_pathstring)
1437         self.log(request, level=OPERATIONAL)
1438
1439         from_path = self._path_from_string(from_pathstring)
1440         to_path = self._path_from_string(to_pathstring)
1441         from_userpath = self._path_to_utf8(from_path)
1442         to_userpath = self._path_to_utf8(to_path)
1443
1444         # the target directory must already exist
1445         d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
1446                                         self._get_parent_or_node(to_path)])
1447         def _got( (from_pair, to_pair) ):
1448             if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r, overwrite=%r)" %
1449                                (from_pair, to_pair, from_pathstring, to_pathstring, overwrite), level=NOISY)
1450             (from_parent, from_childname) = from_pair
1451             (to_parent, to_childname) = to_pair
1452
1453             if from_childname is None:
1454                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename a source object specified by URI")
1455             if to_childname is None:
1456                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename to a destination specified by URI")
1457
1458             # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
1459             # "It is an error if there already exists a file with the name specified
1460             #  by newpath."
1461             # OpenSSH's SFTP server returns FX_PERMISSION_DENIED for this error.
1462             #
1463             # For the standard SSH_FXP_RENAME operation, overwrite=False.
1464             # We also support the posix-rename@openssh.com extension, which uses overwrite=True.
1465
1466             d2 = defer.succeed(None)
1467             if not overwrite:
1468                 d2.addCallback(lambda ign: to_parent.get(to_childname))
1469                 def _expect_fail(res):
1470                     if not isinstance(res, Failure):
1471                         raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1472
1473                     # It is OK if we fail for errors other than NoSuchChildError, since that probably
1474                     # indicates some problem accessing the destination directory.
1475                     res.trap(NoSuchChildError)
1476                 d2.addBoth(_expect_fail)
1477
1478             # If there are heisenfiles to be written at the 'from' direntry, then ensure
1479             # they will now be written at the 'to' direntry instead.
1480             d2.addCallback(lambda ign:
1481                            self._rename_heisenfiles(from_userpath, from_parent, from_childname,
1482                                                     to_userpath, to_parent, to_childname, overwrite=overwrite))
1483
1484             def _move(renamed):
1485                 # FIXME: use move_child_to_path to avoid possible data loss due to #943
1486                 #d3 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
1487
1488                 d3 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
1489                 def _check(err):
1490                     if noisy: self.log("_check(%r) in .renameFile(%r, %r, overwrite=%r)" %
1491                                        (err, from_pathstring, to_pathstring, overwrite), level=NOISY)
1492
1493                     if not isinstance(err, Failure) or (renamed and err.check(NoSuchChildError)):
1494                         return None
1495                     if not overwrite and err.check(ExistingChildError):
1496                         raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1497
1498                     return err
1499                 d3.addBoth(_check)
1500                 return d3
1501             d2.addCallback(_move)
1502             return d2
1503         d.addCallback(_got)
1504         d.addBoth(_convert_error, request)
1505         return d
1506
1507     def makeDirectory(self, pathstring, attrs):
1508         request = ".makeDirectory(%r, %r)" % (pathstring, attrs)
1509         self.log(request, level=OPERATIONAL)
1510
1511         path = self._path_from_string(pathstring)
1512         metadata = _attrs_to_metadata(attrs)
1513         if 'no-write' in metadata:
1514             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "cannot create a directory that is initially read-only")
1515             return defer.execute(_denied)
1516
1517         d = self._get_root(path)
1518         d.addCallback(lambda (root, path):
1519                       self._get_or_create_directories(root, path, metadata))
1520         d.addBoth(_convert_error, request)
1521         return d
1522
1523     def _get_or_create_directories(self, node, path, metadata):
1524         if not IDirectoryNode.providedBy(node):
1525             # TODO: provide the name of the blocking file in the error message.
1526             def _blocked(): raise SFTPError(FX_FAILURE, "cannot create directory because there "
1527                                                         "is a file in the way") # close enough
1528             return defer.execute(_blocked)
1529
1530         if not path:
1531             return defer.succeed(node)
1532         d = node.get(path[0])
1533         def _maybe_create(f):
1534             f.trap(NoSuchChildError)
1535             return node.create_subdirectory(path[0])
1536         d.addErrback(_maybe_create)
1537         d.addCallback(self._get_or_create_directories, path[1:], metadata)
1538         return d
1539
1540     def removeFile(self, pathstring):
1541         request = ".removeFile(%r)" % (pathstring,)
1542         self.log(request, level=OPERATIONAL)
1543
1544         path = self._path_from_string(pathstring)
1545         d = self._remove_object(path, must_be_file=True)
1546         d.addBoth(_convert_error, request)
1547         return d
1548
1549     def removeDirectory(self, pathstring):
1550         request = ".removeDirectory(%r)" % (pathstring,)
1551         self.log(request, level=OPERATIONAL)
1552
1553         path = self._path_from_string(pathstring)
1554         d = self._remove_object(path, must_be_directory=True)
1555         d.addBoth(_convert_error, request)
1556         return d
1557
1558     def _remove_object(self, path, must_be_directory=False, must_be_file=False):
1559         userpath = self._path_to_utf8(path)
1560         d = self._get_parent_or_node(path)
1561         def _got_parent( (parent, childname) ):
1562             if childname is None:
1563                 raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
1564
1565             direntry = _direntry_for(parent, childname)
1566             d2 = defer.succeed(False)
1567             if not must_be_directory:
1568                 d2.addCallback(lambda ign: self._abandon_any_heisenfiles(userpath, direntry))
1569
1570             d2.addCallback(lambda abandoned:
1571                            parent.delete(childname, must_exist=not abandoned,
1572                                          must_be_directory=must_be_directory, must_be_file=must_be_file))
1573             return d2
1574         d.addCallback(_got_parent)
1575         return d
1576
1577     def openDirectory(self, pathstring):
1578         request = ".openDirectory(%r)" % (pathstring,)
1579         self.log(request, level=OPERATIONAL)
1580
1581         path = self._path_from_string(pathstring)
1582         d = self._get_parent_or_node(path)
1583         def _got_parent_or_node( (parent_or_node, childname) ):
1584             if noisy: self.log("_got_parent_or_node( (%r, %r) ) in openDirectory(%r)" %
1585                                (parent_or_node, childname, pathstring), level=NOISY)
1586             if childname is None:
1587                 return parent_or_node
1588             else:
1589                 return parent_or_node.get(childname)
1590         d.addCallback(_got_parent_or_node)
1591         def _list(dirnode):
1592             if dirnode.is_unknown():
1593                 raise SFTPError(FX_PERMISSION_DENIED,
1594                                 "cannot list an unknown cap as a directory. Upgrading the gateway "
1595                                 "to a later Tahoe-LAFS version may help")
1596             if not IDirectoryNode.providedBy(dirnode):
1597                 raise SFTPError(FX_PERMISSION_DENIED,
1598                                 "cannot list a file as if it were a directory")
1599
1600             d2 = dirnode.list()
1601             def _render(children):
1602                 parent_readonly = dirnode.is_readonly()
1603                 results = []
1604                 for filename, (child, metadata) in children.iteritems():
1605                     # The file size may be cached or absent.
1606                     metadata['no-write'] = _no_write(parent_readonly, child, metadata)
1607                     attrs = _populate_attrs(child, metadata)
1608                     filename_utf8 = filename.encode('utf-8')
1609                     longname = _lsLine(filename_utf8, attrs)
1610                     results.append( (filename_utf8, longname, attrs) )
1611                 return StoppableList(results)
1612             d2.addCallback(_render)
1613             return d2
1614         d.addCallback(_list)
1615         d.addBoth(_convert_error, request)
1616         return d
1617
1618     def getAttrs(self, pathstring, followLinks):
1619         request = ".getAttrs(%r, followLinks=%r)" % (pathstring, followLinks)
1620         self.log(request, level=OPERATIONAL)
1621
1622         # When asked about a specific file, report its current size.
1623         # TODO: the modification time for a mutable file should be
1624         # reported as the update time of the best version. But that
1625         # information isn't currently stored in mutable shares, I think.
1626
1627         path = self._path_from_string(pathstring)
1628         userpath = self._path_to_utf8(path)
1629         d = self._get_parent_or_node(path)
1630         def _got_parent_or_node( (parent_or_node, childname) ):
1631             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1632
1633             # Some clients will incorrectly try to get the attributes
1634             # of a file immediately after opening it, before it has been put
1635             # into the all_heisenfiles table. This is a race condition bug in
1636             # the client, but we handle it anyway by calling .sync() on all
1637             # files matching either the path or the direntry.
1638
1639             direntry = _direntry_for(parent_or_node, childname)
1640             d2 = self._sync_heisenfiles(userpath, direntry)
1641
1642             if childname is None:
1643                 node = parent_or_node
1644                 d2.addCallback(lambda ign: node.get_current_size())
1645                 d2.addCallback(lambda size:
1646                                _populate_attrs(node, {'no-write': node.is_unknown() or node.is_readonly()}, size=size))
1647             else:
1648                 parent = parent_or_node
1649                 d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname]))
1650                 def _got( (child, metadata) ):
1651                     if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
1652                     assert IDirectoryNode.providedBy(parent), parent
1653                     metadata['no-write'] = _no_write(parent.is_readonly(), child, metadata)
1654                     d3 = child.get_current_size()
1655                     d3.addCallback(lambda size: _populate_attrs(child, metadata, size=size))
1656                     return d3
1657                 def _nosuch(err):
1658                     if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
1659                     err.trap(NoSuchChildError)
1660                     if noisy: self.log("checking open files:\nself._heisenfiles = %r\nall_heisenfiles = %r\ndirentry=%r" %
1661                                        (self._heisenfiles, all_heisenfiles, direntry), level=NOISY)
1662                     if direntry in all_heisenfiles:
1663                         files = all_heisenfiles[direntry]
1664                         if len(files) == 0:  # pragma: no cover
1665                             return err
1666                         # use the heisenfile that was most recently opened
1667                         return files[-1].getAttrs()
1668                     return err
1669                 d2.addCallbacks(_got, _nosuch)
1670             return d2
1671         d.addCallback(_got_parent_or_node)
1672         d.addBoth(_convert_error, request)
1673         return d
1674
1675     def setAttrs(self, pathstring, attrs):
1676         request = ".setAttrs(%r, %r)" % (pathstring, attrs)
1677         self.log(request, level=OPERATIONAL)
1678
1679         if "size" in attrs:
1680             # this would require us to download and re-upload the truncated/extended
1681             # file contents
1682             def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute unsupported")
1683             return defer.execute(_unsupported)
1684
1685         path = self._path_from_string(pathstring)
1686         userpath = self._path_to_utf8(path)
1687         d = self._get_parent_or_node(path)
1688         def _got_parent_or_node( (parent_or_node, childname) ):
1689             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1690
1691             direntry = _direntry_for(parent_or_node, childname)
1692             d2 = self._update_attrs_for_heisenfiles(userpath, direntry, attrs)
1693
1694             def _update(updated_heisenfiles):
1695                 if childname is None:
1696                     if updated_heisenfiles:
1697                         return None
1698                     raise SFTPError(FX_NO_SUCH_FILE, userpath)
1699                 else:
1700                     desired_metadata = _attrs_to_metadata(attrs)
1701                     if noisy: self.log("desired_metadata = %r" % (desired_metadata,), level=NOISY)
1702
1703                     d3 = parent_or_node.set_metadata_for(childname, desired_metadata)
1704                     def _nosuch(err):
1705                         if updated_heisenfiles:
1706                             err.trap(NoSuchChildError)
1707                         else:
1708                             return err
1709                     d3.addErrback(_nosuch)
1710                     return d3
1711             d2.addCallback(_update)
1712             d2.addCallback(lambda ign: None)
1713             return d2
1714         d.addCallback(_got_parent_or_node)
1715         d.addBoth(_convert_error, request)
1716         return d
1717
1718     def readLink(self, pathstring):
1719         self.log(".readLink(%r)" % (pathstring,), level=OPERATIONAL)
1720
1721         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "readLink")
1722         return defer.execute(_unsupported)
1723
1724     def makeLink(self, linkPathstring, targetPathstring):
1725         self.log(".makeLink(%r, %r)" % (linkPathstring, targetPathstring), level=OPERATIONAL)
1726
1727         # If this is implemented, note the reversal of arguments described in point 7 of
1728         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>.
1729
1730         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "makeLink")
1731         return defer.execute(_unsupported)
1732
1733     def extendedRequest(self, extensionName, extensionData):
1734         self.log(".extendedRequest(%r, <data of length %r>)" % (extensionName, len(extensionData)), level=OPERATIONAL)
1735
1736         # We implement the three main OpenSSH SFTP extensions; see
1737         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1738
1739         if extensionName == 'posix-rename@openssh.com':
1740             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "could not parse posix-rename@openssh.com request")
1741
1742             if 4 > len(extensionData): return defer.execute(_bad)
1743             (fromPathLen,) = struct.unpack('>L', extensionData[0:4])
1744             if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
1745
1746             (toPathLen,) = struct.unpack('>L', extensionData[(4 + fromPathLen):(8 + fromPathLen)])
1747             if 8 + fromPathLen + toPathLen != len(extensionData): return defer.execute(_bad)
1748
1749             fromPathstring = extensionData[4:(4 + fromPathLen)]
1750             toPathstring = extensionData[(8 + fromPathLen):]
1751             d = self.renameFile(fromPathstring, toPathstring, overwrite=True)
1752
1753             # Twisted conch assumes that the response from an extended request is either
1754             # an error, or an FXP_EXTENDED_REPLY. But it happens to do the right thing
1755             # (respond with an FXP_STATUS message) if we return a Failure with code FX_OK.
1756             def _succeeded(ign):
1757                 raise SFTPError(FX_OK, "request succeeded")
1758             d.addCallback(_succeeded)
1759             return d
1760
1761         if extensionName == 'statvfs@openssh.com' or extensionName == 'fstatvfs@openssh.com':
1762             # f_bsize and f_frsize should be the same to avoid a bug in 'df'
1763             return defer.succeed(struct.pack('>11Q',
1764                 1024,         # uint64  f_bsize     /* file system block size */
1765                 1024,         # uint64  f_frsize    /* fundamental fs block size */
1766                 628318530,    # uint64  f_blocks    /* number of blocks (unit f_frsize) */
1767                 314159265,    # uint64  f_bfree     /* free blocks in file system */
1768                 314159265,    # uint64  f_bavail    /* free blocks for non-root */
1769                 200000000,    # uint64  f_files     /* total file inodes */
1770                 100000000,    # uint64  f_ffree     /* free file inodes */
1771                 100000000,    # uint64  f_favail    /* free file inodes for non-root */
1772                 0x1AF5,       # uint64  f_fsid      /* file system id */
1773                 2,            # uint64  f_flag      /* bit mask = ST_NOSUID; not ST_RDONLY */
1774                 65535,        # uint64  f_namemax   /* maximum filename length */
1775                 ))
1776
1777         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "unsupported %r request <data of length %r>" %
1778                                                                (extensionName, len(extensionData)))
1779         return defer.execute(_unsupported)
1780
1781     def realPath(self, pathstring):
1782         self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
1783
1784         return self._path_to_utf8(self._path_from_string(pathstring))
1785
1786     def _path_to_utf8(self, path):
1787         return (u"/" + u"/".join(path)).encode('utf-8')
1788
1789     def _path_from_string(self, pathstring):
1790         if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
1791
1792         assert isinstance(pathstring, str), pathstring
1793
1794         # The home directory is the root directory.
1795         pathstring = pathstring.strip("/")
1796         if pathstring == "" or pathstring == ".":
1797             path_utf8 = []
1798         else:
1799             path_utf8 = pathstring.split("/")
1800
1801         # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.2>
1802         # "Servers SHOULD interpret a path name component ".." as referring to
1803         #  the parent directory, and "." as referring to the current directory."
1804         path = []
1805         for p_utf8 in path_utf8:
1806             if p_utf8 == "..":
1807                 # ignore excess .. components at the root
1808                 if len(path) > 0:
1809                     path = path[:-1]
1810             elif p_utf8 != ".":
1811                 try:
1812                     p = p_utf8.decode('utf-8', 'strict')
1813                 except UnicodeError:
1814                     raise SFTPError(FX_NO_SUCH_FILE, "path could not be decoded as UTF-8")
1815                 path.append(p)
1816
1817         if noisy: self.log(" PATH %r" % (path,), level=NOISY)
1818         return path
1819
1820     def _get_root(self, path):
1821         # return Deferred (root, remaining_path)
1822         d = defer.succeed(None)
1823         if path and path[0] == u"uri":
1824             d.addCallback(lambda ign: self._client.create_node_from_uri(path[1].encode('utf-8')))
1825             d.addCallback(lambda root: (root, path[2:]))
1826         else:
1827             d.addCallback(lambda ign: (self._root, path))
1828         return d
1829
1830     def _get_parent_or_node(self, path):
1831         # return Deferred (parent, childname) or (node, None)
1832         d = self._get_root(path)
1833         def _got_root( (root, remaining_path) ):
1834             if not remaining_path:
1835                 return (root, None)
1836             else:
1837                 d2 = root.get_child_at_path(remaining_path[:-1])
1838                 d2.addCallback(lambda parent: (parent, remaining_path[-1]))
1839                 return d2
1840         d.addCallback(_got_root)
1841         return d
1842
1843
1844 class FakeTransport:
1845     implements(ITransport)
1846     def write(self, data):
1847         logmsg("FakeTransport.write(<data of length %r>)" % (len(data),), level=NOISY)
1848
1849     def writeSequence(self, data):
1850         logmsg("FakeTransport.writeSequence(...)", level=NOISY)
1851
1852     def loseConnection(self):
1853         logmsg("FakeTransport.loseConnection()", level=NOISY)
1854
1855     # getPeer and getHost can just raise errors, since we don't know what to return
1856
1857
1858 class ShellSession(PrefixingLogMixin):
1859     implements(ISession)
1860     def __init__(self, userHandler):
1861         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
1862         if noisy: self.log(".__init__(%r)" % (userHandler), level=NOISY)
1863
1864     def getPty(self, terminal, windowSize, attrs):
1865         self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
1866
1867     def openShell(self, protocol):
1868         self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
1869         if hasattr(protocol, 'transport') and protocol.transport is None:
1870             protocol.transport = FakeTransport()  # work around Twisted bug
1871
1872         return self._unsupported(protocol)
1873
1874     def execCommand(self, protocol, cmd):
1875         self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
1876         if hasattr(protocol, 'transport') and protocol.transport is None:
1877             protocol.transport = FakeTransport()  # work around Twisted bug
1878
1879         d = defer.succeed(None)
1880         if cmd == "df -P -k /":
1881             d.addCallback(lambda ign: protocol.write(
1882                           "Filesystem         1024-blocks      Used Available Capacity Mounted on\r\n"
1883                           "tahoe                628318530 314159265 314159265      50% /\r\n"))
1884             d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessDone(None))))
1885         else:
1886             d.addCallback(lambda ign: self._unsupported(protocol))
1887         return d
1888
1889     def _unsupported(self, protocol):
1890         d = defer.succeed(None)
1891         d.addCallback(lambda ign: protocol.errReceived(
1892                       "This server supports only the SFTP protocol. It does not support SCP,\r\n"
1893                       "interactive shell sessions, or commands other than one needed by sshfs.\r\n"))
1894         d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
1895         return d
1896
1897     def windowChanged(self, newWindowSize):
1898         self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
1899
1900     def eofReceived(self):
1901         self.log(".eofReceived()", level=OPERATIONAL)
1902
1903     def closed(self):
1904         self.log(".closed()", level=OPERATIONAL)
1905
1906
1907 # If you have an SFTPUserHandler and want something that provides ISession, you get
1908 # ShellSession(userHandler).
1909 # We use adaptation because this must be a different object to the SFTPUserHandler.
1910 components.registerAdapter(ShellSession, SFTPUserHandler, ISession)
1911
1912
1913 from allmydata.frontends.auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
1914
1915 class Dispatcher:
1916     implements(portal.IRealm)
1917     def __init__(self, client):
1918         self._client = client
1919
1920     def requestAvatar(self, avatarID, mind, interface):
1921         assert interface == IConchUser, interface
1922         rootnode = self._client.create_node_from_uri(avatarID.rootcap)
1923         handler = SFTPUserHandler(self._client, rootnode, avatarID.username)
1924         return (interface, handler, handler.logout)
1925
1926
1927 class SFTPServer(service.MultiService):
1928     def __init__(self, client, accountfile, accounturl,
1929                  sftp_portstr, pubkey_file, privkey_file):
1930         service.MultiService.__init__(self)
1931
1932         r = Dispatcher(client)
1933         p = portal.Portal(r)
1934
1935         if accountfile:
1936             c = AccountFileChecker(self, accountfile)
1937             p.registerChecker(c)
1938         if accounturl:
1939             c = AccountURLChecker(self, accounturl)
1940             p.registerChecker(c)
1941         if not accountfile and not accounturl:
1942             # we could leave this anonymous, with just the /uri/CAP form
1943             raise NeedRootcapLookupScheme("must provide an account file or URL")
1944
1945         pubkey = keys.Key.fromFile(pubkey_file)
1946         privkey = keys.Key.fromFile(privkey_file)
1947         class SSHFactory(factory.SSHFactory):
1948             publicKeys = {pubkey.sshType(): pubkey}
1949             privateKeys = {privkey.sshType(): privkey}
1950             def getPrimes(self):
1951                 try:
1952                     # if present, this enables diffie-hellman-group-exchange
1953                     return primes.parseModuliFile("/etc/ssh/moduli")
1954                 except IOError:
1955                     return None
1956
1957         f = SSHFactory()
1958         f.portal = p
1959
1960         s = strports.service(sftp_portstr, f)
1961         s.setServiceParent(self)