]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/sftpd.py
SFTP: add a comment about a subtle interaction between OverwriteableFileConsumer...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / sftpd.py
1
2 import heapq, traceback, array, stat, struct
3 from types import NoneType
4 from stat import S_IFREG, S_IFDIR
5 from time import time, strftime, localtime
6
7 from zope.interface import implements
8 from twisted.python import components
9 from twisted.application import service, strports
10 from twisted.conch.ssh import factory, keys, session
11 from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
12      FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
13      FX_BAD_MESSAGE, FX_FAILURE, FX_OK
14 from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
15      FXF_CREAT, FXF_TRUNC, FXF_EXCL
16 from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser, ISession
17 from twisted.conch.avatar import ConchUser
18 from twisted.conch.openssh_compat import primes
19 from twisted.cred import portal
20 from twisted.internet.error import ProcessDone, ProcessTerminated
21 from twisted.python.failure import Failure
22 from twisted.internet.interfaces import ITransport
23
24 from twisted.internet import defer
25 from twisted.internet.interfaces import IFinishableConsumer
26 from foolscap.api import eventually
27 from allmydata.util import deferredutil
28
29 from allmydata.util.consumer import download_to_data
30 from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
31      NoSuchChildError, ChildOfWrongTypeError
32 from allmydata.mutable.common import NotWriteableError
33 from allmydata.mutable.publish import MutableFileHandle
34 from allmydata.immutable.upload import FileHandle
35 from allmydata.dirnode import update_metadata
36 from allmydata.util.fileutil import EncryptedTemporaryFile
37
38 noisy = True
39 use_foolscap_logging = True
40
41 from allmydata.util.log import NOISY, OPERATIONAL, WEIRD, \
42     msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
43
44 if use_foolscap_logging:
45     (logmsg, logerr, PrefixingLogMixin) = (_msg, _err, _PrefixingLogMixin)
46 else:  # pragma: no cover
47     def logmsg(s, level=None):
48         print s
49     def logerr(s, level=None):
50         print s
51     class PrefixingLogMixin:
52         def __init__(self, facility=None, prefix=''):
53             self.prefix = prefix
54         def log(self, s, level=None):
55             print "%r %s" % (self.prefix, s)
56
57
58 def eventually_callback(d):
59     return lambda res: eventually(d.callback, res)
60
61 def eventually_errback(d):
62     return lambda err: eventually(d.errback, err)
63
64
65 def _utf8(x):
66     if isinstance(x, unicode):
67         return x.encode('utf-8')
68     if isinstance(x, str):
69         return x
70     return repr(x)
71
72
73 def _to_sftp_time(t):
74     """SFTP times are unsigned 32-bit integers representing UTC seconds
75     (ignoring leap seconds) since the Unix epoch, January 1 1970 00:00 UTC.
76     A Tahoe time is the corresponding float."""
77     return long(t) & 0xFFFFFFFFL
78
79
80 def _convert_error(res, request):
81     """If res is not a Failure, return it, otherwise reraise the appropriate
82     SFTPError."""
83
84     if not isinstance(res, Failure):
85         logged_res = res
86         if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
87         logmsg("SUCCESS %r %r" % (request, logged_res,), level=OPERATIONAL)
88         return res
89
90     err = res
91     logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
92     try:
93         if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
94     except Exception:  # pragma: no cover
95         pass
96
97     # The message argument to SFTPError must not reveal information that
98     # might compromise anonymity, if we are running over an anonymous network.
99
100     if err.check(SFTPError):
101         # original raiser of SFTPError has responsibility to ensure anonymity
102         raise err
103     if err.check(NoSuchChildError):
104         childname = _utf8(err.value.args[0])
105         raise SFTPError(FX_NO_SUCH_FILE, childname)
106     if err.check(NotWriteableError) or err.check(ChildOfWrongTypeError):
107         msg = _utf8(err.value.args[0])
108         raise SFTPError(FX_PERMISSION_DENIED, msg)
109     if err.check(ExistingChildError):
110         # Versions of SFTP after v3 (which is what twisted.conch implements)
111         # define a specific error code for this case: FX_FILE_ALREADY_EXISTS.
112         # However v3 doesn't; instead, other servers such as sshd return
113         # FX_FAILURE. The gvfs SFTP backend, for example, depends on this
114         # to translate the error to the equivalent of POSIX EEXIST, which is
115         # necessary for some picky programs (such as gedit).
116         msg = _utf8(err.value.args[0])
117         raise SFTPError(FX_FAILURE, msg)
118     if err.check(NotImplementedError):
119         raise SFTPError(FX_OP_UNSUPPORTED, _utf8(err.value))
120     if err.check(EOFError):
121         raise SFTPError(FX_EOF, "end of file reached")
122     if err.check(defer.FirstError):
123         _convert_error(err.value.subFailure, request)
124
125     # We assume that the error message is not anonymity-sensitive.
126     raise SFTPError(FX_FAILURE, _utf8(err.value))
127
128
129 def _repr_flags(flags):
130     return "|".join([f for f in
131                      [(flags & FXF_READ)   and "FXF_READ"   or None,
132                       (flags & FXF_WRITE)  and "FXF_WRITE"  or None,
133                       (flags & FXF_APPEND) and "FXF_APPEND" or None,
134                       (flags & FXF_CREAT)  and "FXF_CREAT"  or None,
135                       (flags & FXF_TRUNC)  and "FXF_TRUNC"  or None,
136                       (flags & FXF_EXCL)   and "FXF_EXCL"   or None,
137                      ]
138                      if f])
139
140
141 def _lsLine(name, attrs):
142     st_uid = "tahoe"
143     st_gid = "tahoe"
144     st_mtime = attrs.get("mtime", 0)
145     st_mode = attrs["permissions"]
146
147     # Some clients won't tolerate '?' in the size field (#1337).
148     st_size = attrs.get("size", 0)
149
150     # We don't know how many links there really are to this object.
151     st_nlink = 1
152
153     # Based on <http://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
154     # We can't call the version in Twisted because we might have a version earlier than
155     # <http://twistedmatrix.com/trac/changeset/25412> (released in Twisted 8.2).
156
157     mode = st_mode
158     perms = array.array('c', '-'*10)
159     ft = stat.S_IFMT(mode)
160     if   stat.S_ISDIR(ft): perms[0] = 'd'
161     elif stat.S_ISREG(ft): perms[0] = '-'
162     else: perms[0] = '?'
163     # user
164     if mode&stat.S_IRUSR: perms[1] = 'r'
165     if mode&stat.S_IWUSR: perms[2] = 'w'
166     if mode&stat.S_IXUSR: perms[3] = 'x'
167     # group
168     if mode&stat.S_IRGRP: perms[4] = 'r'
169     if mode&stat.S_IWGRP: perms[5] = 'w'
170     if mode&stat.S_IXGRP: perms[6] = 'x'
171     # other
172     if mode&stat.S_IROTH: perms[7] = 'r'
173     if mode&stat.S_IWOTH: perms[8] = 'w'
174     if mode&stat.S_IXOTH: perms[9] = 'x'
175     # suid/sgid never set
176
177     l = perms.tostring()
178     l += str(st_nlink).rjust(5) + ' '
179     un = str(st_uid)
180     l += un.ljust(9)
181     gr = str(st_gid)
182     l += gr.ljust(9)
183     sz = str(st_size)
184     l += sz.rjust(8)
185     l += ' '
186     day = 60 * 60 * 24
187     sixmo = day * 7 * 26
188     now = time()
189     if st_mtime + sixmo < now or st_mtime > now + day:
190         # mtime is more than 6 months ago, or more than one day in the future
191         l += strftime("%b %d  %Y ", localtime(st_mtime))
192     else:
193         l += strftime("%b %d %H:%M ", localtime(st_mtime))
194     l += name
195     return l
196
197
198 def _no_write(parent_readonly, child, metadata=None):
199     """Whether child should be listed as having read-only permissions in parent."""
200
201     if child.is_unknown():
202         return True
203     elif child.is_mutable():
204         return child.is_readonly()
205     elif parent_readonly or IDirectoryNode.providedBy(child):
206         return True
207     else:
208         return metadata is not None and metadata.get('no-write', False)
209
210
211 def _populate_attrs(childnode, metadata, size=None):
212     attrs = {}
213
214     # The permissions must have the S_IFDIR (040000) or S_IFREG (0100000)
215     # bits, otherwise the client may refuse to open a directory.
216     # Also, sshfs run as a non-root user requires files and directories
217     # to be world-readable/writeable.
218     # It is important that we never set the executable bits on files.
219     #
220     # Directories and unknown nodes have no size, and SFTP doesn't
221     # require us to make one up.
222     #
223     # childnode might be None, meaning that the file doesn't exist yet,
224     # but we're going to write it later.
225
226     if childnode and childnode.is_unknown():
227         perms = 0
228     elif childnode and IDirectoryNode.providedBy(childnode):
229         perms = S_IFDIR | 0777
230     else:
231         # For files, omit the size if we don't immediately know it.
232         if childnode and size is None:
233             size = childnode.get_size()
234         if size is not None:
235             assert isinstance(size, (int, long)) and not isinstance(size, bool), repr(size)
236             attrs['size'] = size
237         perms = S_IFREG | 0666
238
239     if metadata:
240         if metadata.get('no-write', False):
241             perms &= S_IFDIR | S_IFREG | 0555  # clear 'w' bits
242
243         # See webapi.txt for what these times mean.
244         # We would prefer to omit atime, but SFTP version 3 can only
245         # accept mtime if atime is also set.
246         if 'linkmotime' in metadata.get('tahoe', {}):
247             attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
248         elif 'mtime' in metadata:
249             attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['mtime'])
250
251         if 'linkcrtime' in metadata.get('tahoe', {}):
252             attrs['createtime'] = _to_sftp_time(metadata['tahoe']['linkcrtime'])
253
254     attrs['permissions'] = perms
255
256     # twisted.conch.ssh.filetransfer only implements SFTP version 3,
257     # which doesn't include SSH_FILEXFER_ATTR_FLAGS.
258
259     return attrs
260
261
262 def _attrs_to_metadata(attrs):
263     metadata = {}
264
265     for key in attrs:
266         if key == "mtime" or key == "ctime" or key == "createtime":
267             metadata[key] = long(attrs[key])
268         elif key.startswith("ext_"):
269             metadata[key] = str(attrs[key])
270
271     perms = attrs.get('permissions', stat.S_IWUSR)
272     if not (perms & stat.S_IWUSR):
273         metadata['no-write'] = True
274
275     return metadata
276
277
278 def _direntry_for(filenode_or_parent, childname, filenode=None):
279     assert isinstance(childname, (unicode, NoneType)), childname
280
281     if childname is None:
282         filenode_or_parent = filenode
283
284     if filenode_or_parent:
285         rw_uri = filenode_or_parent.get_write_uri()
286         if rw_uri and childname:
287             return rw_uri + "/" + childname.encode('utf-8')
288         else:
289             return rw_uri
290
291     return None
292
293
294 class OverwriteableFileConsumer(PrefixingLogMixin):
295     implements(IFinishableConsumer)
296     """I act both as a consumer for the download of the original file contents, and as a
297     wrapper for a temporary file that records the downloaded data and any overwrites.
298     I use a priority queue to keep track of which regions of the file have been overwritten
299     but not yet downloaded, so that the download does not clobber overwritten data.
300     I use another priority queue to record milestones at which to make callbacks
301     indicating that a given number of bytes have been downloaded.
302
303     The temporary file reflects the contents of the file that I represent, except that:
304      - regions that have neither been downloaded nor overwritten, if present,
305        contain garbage.
306      - the temporary file may be shorter than the represented file (it is never longer).
307        The latter's current size is stored in self.current_size.
308
309     This abstraction is mostly independent of SFTP. Consider moving it, if it is found
310     useful for other frontends."""
311
312     def __init__(self, download_size, tempfile_maker):
313         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
314         if noisy: self.log(".__init__(%r, %r)" % (download_size, tempfile_maker), level=NOISY)
315         self.download_size = download_size
316         self.current_size = download_size
317         self.f = tempfile_maker()
318         self.downloaded = 0
319         self.milestones = []  # empty heap of (offset, d)
320         self.overwrites = []  # empty heap of (start, end)
321         self.is_closed = False
322         self.done = self.when_reached(download_size)  # adds a milestone
323         self.is_done = False
324         def _signal_done(ign):
325             if noisy: self.log("DONE", level=NOISY)
326             self.is_done = True
327         self.done.addCallback(_signal_done)
328         self.producer = None
329
330     def get_file(self):
331         return self.f
332
333     def get_current_size(self):
334         return self.current_size
335
336     def set_current_size(self, size):
337         if noisy: self.log(".set_current_size(%r), current_size = %r, downloaded = %r" %
338                            (size, self.current_size, self.downloaded), level=NOISY)
339         if size < self.current_size or size < self.downloaded:
340             self.f.truncate(size)
341         if size > self.current_size:
342             self.overwrite(self.current_size, "\x00" * (size - self.current_size))
343         self.current_size = size
344
345         # make the invariant self.download_size <= self.current_size be true again
346         if size < self.download_size:
347             self.download_size = size
348
349         if self.downloaded >= self.download_size:
350             self.finish()
351
352     def registerProducer(self, p, streaming):
353         if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
354         if self.producer is not None:
355             raise RuntimeError("producer is already registered")
356
357         self.producer = p
358         if streaming:
359             # call resumeProducing once to start things off
360             p.resumeProducing()
361         else:
362             def _iterate():
363                 if not self.is_done:
364                     p.resumeProducing()
365                     eventually(_iterate)
366             _iterate()
367
368     def write(self, data):
369         if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
370         if self.is_closed:
371             return
372
373         if self.downloaded >= self.download_size:
374             return
375
376         next_downloaded = self.downloaded + len(data)
377         if next_downloaded > self.download_size:
378             data = data[:(self.download_size - self.downloaded)]
379
380         while len(self.overwrites) > 0:
381             (start, end) = self.overwrites[0]
382             if start >= next_downloaded:
383                 # This and all remaining overwrites are after the data we just downloaded.
384                 break
385             if start > self.downloaded:
386                 # The data we just downloaded has been partially overwritten.
387                 # Write the prefix of it that precedes the overwritten region.
388                 self.f.seek(self.downloaded)
389                 self.f.write(data[:(start - self.downloaded)])
390
391             # This merges consecutive overwrites if possible, which allows us to detect the
392             # case where the download can be stopped early because the remaining region
393             # to download has already been fully overwritten.
394             heapq.heappop(self.overwrites)
395             while len(self.overwrites) > 0:
396                 (start1, end1) = self.overwrites[0]
397                 if start1 > end:
398                     break
399                 end = end1
400                 heapq.heappop(self.overwrites)
401
402             if end >= next_downloaded:
403                 # This overwrite extends past the downloaded data, so there is no
404                 # more data to consider on this call.
405                 heapq.heappush(self.overwrites, (next_downloaded, end))
406                 self._update_downloaded(next_downloaded)
407                 return
408             elif end >= self.downloaded:
409                 data = data[(end - self.downloaded):]
410                 self._update_downloaded(end)
411
412         self.f.seek(self.downloaded)
413         self.f.write(data)
414         self._update_downloaded(next_downloaded)
415
416     def _update_downloaded(self, new_downloaded):
417         self.downloaded = new_downloaded
418         milestone = new_downloaded
419         if len(self.overwrites) > 0:
420             (start, end) = self.overwrites[0]
421             if start <= new_downloaded and end > milestone:
422                 milestone = end
423
424         while len(self.milestones) > 0:
425             (next, d) = self.milestones[0]
426             if next > milestone:
427                 return
428             if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
429             heapq.heappop(self.milestones)
430             eventually(d.callback, None)
431
432         if milestone >= self.download_size:
433             self.finish()
434
435     def overwrite(self, offset, data):
436         if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
437         if offset > self.current_size:
438             # Normally writing at an offset beyond the current end-of-file
439             # would leave a hole that appears filled with zeroes. However, an
440             # EncryptedTemporaryFile doesn't behave like that (if there is a
441             # hole in the file on disk, the zeroes that are read back will be
442             # XORed with the keystream). So we must explicitly write zeroes in
443             # the gap between the current EOF and the offset.
444
445             self.f.seek(self.current_size)
446             self.f.write("\x00" * (offset - self.current_size))
447             start = self.current_size
448         else:
449             self.f.seek(offset)
450             start = offset
451
452         self.f.write(data)
453         end = offset + len(data)
454         self.current_size = max(self.current_size, end)
455         if end > self.downloaded:
456             heapq.heappush(self.overwrites, (start, end))
457
458     def read(self, offset, length):
459         """When the data has been read, callback the Deferred that we return with this data.
460         Otherwise errback the Deferred that we return.
461         The caller must perform no more overwrites until the Deferred has fired."""
462
463         if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
464
465         # Note that the overwrite method is synchronous. When a write request is processed
466         # (e.g. a writeChunk request on the async queue of GeneralSFTPFile), overwrite will
467         # be called and will update self.current_size if necessary before returning. Therefore,
468         # self.current_size will be up-to-date for a subsequent call to this read method, and
469         # so it is correct to do the check for a read past the end-of-file here.
470         if offset >= self.current_size:
471             def _eof(): raise EOFError("read past end of file")
472             return defer.execute(_eof)
473
474         if offset + length > self.current_size:
475             length = self.current_size - offset
476             if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
477
478         needed = min(offset + length, self.download_size)
479         d = self.when_reached(needed)
480         def _reached(ign):
481             # It is not necessarily the case that self.downloaded >= needed, because
482             # the file might have been truncated (thus truncating the download) and
483             # then extended.
484
485             assert self.current_size >= offset + length, (self.current_size, offset, length)
486             if noisy: self.log("self.f = %r" % (self.f,), level=NOISY)
487             self.f.seek(offset)
488             return self.f.read(length)
489         d.addCallback(_reached)
490         return d
491
492     def when_reached(self, index):
493         if noisy: self.log(".when_reached(%r)" % (index,), level=NOISY)
494         if index <= self.downloaded:  # already reached
495             if noisy: self.log("already reached %r" % (index,), level=NOISY)
496             return defer.succeed(None)
497         d = defer.Deferred()
498         def _reached(ign):
499             if noisy: self.log("reached %r" % (index,), level=NOISY)
500             return ign
501         d.addCallback(_reached)
502         heapq.heappush(self.milestones, (index, d))
503         return d
504
505     def when_done(self):
506         return self.done
507
508     def finish(self):
509         """Called by the producer when it has finished producing, or when we have
510         received enough bytes, or as a result of a close. Defined by IFinishableConsumer."""
511
512         while len(self.milestones) > 0:
513             (next, d) = self.milestones[0]
514             if noisy: self.log("MILESTONE FINISH %r %r" % (next, d), level=NOISY)
515             heapq.heappop(self.milestones)
516             # The callback means that the milestone has been reached if
517             # it is ever going to be. Note that the file may have been
518             # truncated to before the milestone.
519             eventually(d.callback, None)
520
521     def close(self):
522         if not self.is_closed:
523             self.is_closed = True
524             try:
525                 self.f.close()
526             except Exception, e:
527                 self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
528         self.finish()
529
530     def unregisterProducer(self):
531         pass
532
533
534 SIZE_THRESHOLD = 1000
535
536
537 class ShortReadOnlySFTPFile(PrefixingLogMixin):
538     implements(ISFTPFile)
539     """I represent a file handle to a particular file on an SFTP connection.
540     I am used only for short immutable files opened in read-only mode.
541     When I am created, the file contents start to be downloaded to memory.
542     self.async is used to delay read requests until the download has finished."""
543
544     def __init__(self, userpath, filenode, metadata):
545         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
546         if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
547
548         assert isinstance(userpath, str) and IFileNode.providedBy(filenode), (userpath, filenode)
549         self.filenode = filenode
550         self.metadata = metadata
551         self.async = download_to_data(filenode)
552         self.closed = False
553
554     def readChunk(self, offset, length):
555         request = ".readChunk(%r, %r)" % (offset, length)
556         self.log(request, level=OPERATIONAL)
557
558         if self.closed:
559             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
560             return defer.execute(_closed)
561
562         d = defer.Deferred()
563         def _read(data):
564             if noisy: self.log("_read(<data of length %r>) in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY)
565
566             # "In response to this request, the server will read as many bytes as it
567             #  can from the file (up to 'len'), and return them in a SSH_FXP_DATA
568             #  message.  If an error occurs or EOF is encountered before reading any
569             #  data, the server will respond with SSH_FXP_STATUS.  For normal disk
570             #  files, it is guaranteed that this will read the specified number of
571             #  bytes, or up to end of file."
572             #
573             # i.e. we respond with an EOF error iff offset is already at EOF.
574
575             if offset >= len(data):
576                 eventually(d.errback, SFTPError(FX_EOF, "read at or past end of file"))
577             else:
578                 eventually(d.callback, data[offset:offset+length])  # truncated if offset+length > len(data)
579             return data
580         self.async.addCallbacks(_read, eventually_errback(d))
581         d.addBoth(_convert_error, request)
582         return d
583
584     def writeChunk(self, offset, data):
585         self.log(".writeChunk(%r, <data of length %r>) denied" % (offset, len(data)), level=OPERATIONAL)
586
587         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
588         return defer.execute(_denied)
589
590     def close(self):
591         self.log(".close()", level=OPERATIONAL)
592
593         self.closed = True
594         return defer.succeed(None)
595
596     def getAttrs(self):
597         request = ".getAttrs()"
598         self.log(request, level=OPERATIONAL)
599
600         if self.closed:
601             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
602             return defer.execute(_closed)
603
604         d = defer.execute(_populate_attrs, self.filenode, self.metadata)
605         d.addBoth(_convert_error, request)
606         return d
607
608     def setAttrs(self, attrs):
609         self.log(".setAttrs(%r) denied" % (attrs,), level=OPERATIONAL)
610         def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
611         return defer.execute(_denied)
612
613
614 class GeneralSFTPFile(PrefixingLogMixin):
615     implements(ISFTPFile)
616     """I represent a file handle to a particular file on an SFTP connection.
617     I wrap an instance of OverwriteableFileConsumer, which is responsible for
618     storing the file contents. In order to allow write requests to be satisfied
619     immediately, there is effectively a FIFO queue between requests made to this
620     file handle, and requests to my OverwriteableFileConsumer. This queue is
621     implemented by the callback chain of self.async.
622
623     When first constructed, I am in an 'unopened' state that causes most
624     operations to be delayed until 'open' is called."""
625
626     def __init__(self, userpath, flags, close_notify, convergence):
627         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
628         if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
629                            (userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
630
631         assert isinstance(userpath, str), userpath
632         self.userpath = userpath
633         self.flags = flags
634         self.close_notify = close_notify
635         self.convergence = convergence
636         self.async = defer.Deferred()
637         # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
638         self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
639         self.closed = False
640         self.abandoned = False
641         self.parent = None
642         self.childname = None
643         self.filenode = None
644         self.metadata = None
645
646         # self.consumer should only be relied on in callbacks for self.async, since it might
647         # not be set before then.
648         self.consumer = None
649
650     def open(self, parent=None, childname=None, filenode=None, metadata=None):
651         self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
652                  (parent, childname, filenode, metadata), level=OPERATIONAL)
653
654         assert isinstance(childname, (unicode, NoneType)), childname
655         # If the file has been renamed, the new (parent, childname) takes precedence.
656         if self.parent is None:
657             self.parent = parent
658         if self.childname is None:
659             self.childname = childname
660         self.filenode = filenode
661         self.metadata = metadata
662
663         assert not self.closed, self
664         tempfile_maker = EncryptedTemporaryFile
665
666         if (self.flags & FXF_TRUNC) or not filenode:
667             # We're either truncating or creating the file, so we don't need the old contents.
668             self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
669             self.consumer.finish()
670         else:
671             assert IFileNode.providedBy(filenode), filenode
672
673             self.async.addCallback(lambda ignored: filenode.get_best_readable_version())
674
675             def _read(version):
676                 if noisy: self.log("_read", level=NOISY)
677                 download_size = version.get_size()
678                 assert download_size is not None
679
680                 self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
681
682                 version.read(self.consumer, 0, None)
683             self.async.addCallback(_read)
684
685         eventually(self.async.callback, None)
686
687         if noisy: self.log("open done", level=NOISY)
688         return self
689
690     def get_userpath(self):
691         return self.userpath
692
693     def get_direntry(self):
694         return _direntry_for(self.parent, self.childname)
695
696     def rename(self, new_userpath, new_parent, new_childname):
697         self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
698
699         assert isinstance(new_userpath, str) and isinstance(new_childname, unicode), (new_userpath, new_childname)
700         self.userpath = new_userpath
701         self.parent = new_parent
702         self.childname = new_childname
703
704     def abandon(self):
705         self.log(".abandon()", level=OPERATIONAL)
706
707         self.abandoned = True
708
709     def sync(self, ign=None):
710         # The ign argument allows some_file.sync to be used as a callback.
711         self.log(".sync()", level=OPERATIONAL)
712
713         d = defer.Deferred()
714         self.async.addBoth(eventually_callback(d))
715         def _done(res):
716             if noisy: self.log("_done(%r) in .sync()" % (res,), level=NOISY)
717             return res
718         d.addBoth(_done)
719         return d
720
721     def readChunk(self, offset, length):
722         request = ".readChunk(%r, %r)" % (offset, length)
723         self.log(request, level=OPERATIONAL)
724
725         if not (self.flags & FXF_READ):
726             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for reading")
727             return defer.execute(_denied)
728
729         if self.closed:
730             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
731             return defer.execute(_closed)
732
733         d = defer.Deferred()
734         def _read(ign):
735             if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
736             d2 = self.consumer.read(offset, length)
737             d2.addCallbacks(eventually_callback(d), eventually_errback(d))
738             # It is correct to drop d2 here.
739             return None
740         self.async.addCallbacks(_read, eventually_errback(d))
741         d.addBoth(_convert_error, request)
742         return d
743
744     def writeChunk(self, offset, data):
745         self.log(".writeChunk(%r, <data of length %r>)" % (offset, len(data)), level=OPERATIONAL)
746
747         if not (self.flags & FXF_WRITE):
748             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
749             return defer.execute(_denied)
750
751         if self.closed:
752             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
753             return defer.execute(_closed)
754
755         self.has_changed = True
756
757         # Note that we return without waiting for the write to occur. Reads and
758         # close wait for prior writes, and will fail if any prior operation failed.
759         # This is ok because SFTP makes no guarantee that the write completes
760         # before the request does. In fact it explicitly allows write errors to be
761         # delayed until close:
762         #   "One should note that on some server platforms even a close can fail.
763         #    This can happen e.g. if the server operating system caches writes,
764         #    and an error occurs while flushing cached writes during the close."
765
766         def _write(ign):
767             if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
768                                (offset, len(data), self.consumer.get_current_size()), level=NOISY)
769             # FXF_APPEND means that we should always write at the current end of file.
770             write_offset = offset
771             if self.flags & FXF_APPEND:
772                 write_offset = self.consumer.get_current_size()
773
774             self.consumer.overwrite(write_offset, data)
775             if noisy: self.log("overwrite done", level=NOISY)
776             return None
777         self.async.addCallback(_write)
778         # don't addErrback to self.async, just allow subsequent async ops to fail.
779         return defer.succeed(None)
780
781     def close(self):
782         request = ".close()"
783         self.log(request, level=OPERATIONAL)
784
785         if self.closed:
786             return defer.succeed(None)
787
788         # This means that close has been called, not that the close has succeeded.
789         self.closed = True
790
791         if not (self.flags & (FXF_WRITE | FXF_CREAT)):
792             def _readonly_close():
793                 if self.consumer:
794                     self.consumer.close()
795             return defer.execute(_readonly_close)
796
797         # We must capture the abandoned, parent, and childname variables synchronously
798         # at the close call. This is needed by the correctness arguments in the comments
799         # for _abandon_any_heisenfiles and _rename_heisenfiles.
800         # Note that the file must have been opened before it can be closed.
801         abandoned = self.abandoned
802         parent = self.parent
803         childname = self.childname
804
805         # has_changed is set when writeChunk is called, not when the write occurs, so
806         # it is correct to optimize out the commit if it is False at the close call.
807         has_changed = self.has_changed
808
809         def _committed(res):
810             if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
811
812             self.consumer.close()
813
814             # We must close_notify before re-firing self.async.
815             if self.close_notify:
816                 self.close_notify(self.userpath, self.parent, self.childname, self)
817             return res
818
819         def _close(ign):
820             d2 = self.consumer.when_done()
821             if self.filenode and self.filenode.is_mutable():
822                 self.log("update mutable file %r childname=%r metadata=%r" % (self.filenode, childname, self.metadata), level=OPERATIONAL)
823                 if self.metadata.get('no-write', False) and not self.filenode.is_readonly():
824                     assert parent and childname, (parent, childname, self.metadata)
825                     d2.addCallback(lambda ign: parent.set_metadata_for(childname, self.metadata))
826
827                 d2.addCallback(lambda ign: self.filenode.overwrite(MutableFileHandle(self.consumer.get_file())))
828             else:
829                 def _add_file(ign):
830                     self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
831                     u = FileHandle(self.consumer.get_file(), self.convergence)
832                     return parent.add_file(childname, u, metadata=self.metadata)
833                 d2.addCallback(_add_file)
834
835             d2.addBoth(_committed)
836             return d2
837
838         d = defer.Deferred()
839
840         # If the file has been abandoned, we don't want the close operation to get "stuck",
841         # even if self.async fails to re-fire. Doing the close independently of self.async
842         # in that case ensures that dropping an ssh connection is sufficient to abandon
843         # any heisenfiles that were not explicitly closed in that connection.
844         if abandoned or not has_changed:
845             d.addCallback(_committed)
846         else:
847             self.async.addCallback(_close)
848
849         self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
850         d.addBoth(_convert_error, request)
851         return d
852
853     def getAttrs(self):
854         request = ".getAttrs()"
855         self.log(request, level=OPERATIONAL)
856
857         if self.closed:
858             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
859             return defer.execute(_closed)
860
861         # Optimization for read-only handles, when we already know the metadata.
862         if not (self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
863             return defer.succeed(_populate_attrs(self.filenode, self.metadata))
864
865         d = defer.Deferred()
866         def _get(ign):
867             if noisy: self.log("_get(%r) in %r, filenode = %r, metadata = %r" % (ign, request, self.filenode, self.metadata), level=NOISY)
868
869             # self.filenode might be None, but that's ok.
870             attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
871             eventually(d.callback, attrs)
872             return None
873         self.async.addCallbacks(_get, eventually_errback(d))
874         d.addBoth(_convert_error, request)
875         return d
876
877     def setAttrs(self, attrs, only_if_at=None):
878         request = ".setAttrs(%r, only_if_at=%r)" % (attrs, only_if_at)
879         self.log(request, level=OPERATIONAL)
880
881         if not (self.flags & FXF_WRITE):
882             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
883             return defer.execute(_denied)
884
885         if self.closed:
886             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
887             return defer.execute(_closed)
888
889         size = attrs.get("size", None)
890         if size is not None and (not isinstance(size, (int, long)) or size < 0):
891             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
892             return defer.execute(_bad)
893
894         d = defer.Deferred()
895         def _set(ign):
896             if noisy: self.log("_set(%r) in %r" % (ign, request), level=NOISY)
897             current_direntry = _direntry_for(self.parent, self.childname, self.filenode)
898             if only_if_at and only_if_at != current_direntry:
899                 if noisy: self.log("not setting attributes: current_direntry=%r in %r" %
900                                    (current_direntry, request), level=NOISY)
901                 return None
902
903             now = time()
904             self.metadata = update_metadata(self.metadata, _attrs_to_metadata(attrs), now)
905             if size is not None:
906                 # TODO: should we refuse to truncate a file opened with FXF_APPEND?
907                 # <http://allmydata.org/trac/tahoe-lafs/ticket/1037#comment:20>
908                 self.consumer.set_current_size(size)
909             eventually(d.callback, None)
910             return None
911         self.async.addCallbacks(_set, eventually_errback(d))
912         d.addBoth(_convert_error, request)
913         return d
914
915
916 class StoppableList:
917     def __init__(self, items):
918         self.items = items
919     def __iter__(self):
920         for i in self.items:
921             yield i
922     def close(self):
923         pass
924
925
926 class Reason:
927     def __init__(self, value):
928         self.value = value
929
930
931 # A "heisenfile" is a file that has been opened with write flags
932 # (FXF_WRITE and/or FXF_CREAT) and not yet close-notified.
933 # 'all_heisenfiles' maps from a direntry string to a list of
934 # GeneralSFTPFile.
935 #
936 # A direntry string is parent_write_uri + "/" + childname_utf8 for
937 # an immutable file, or file_write_uri for a mutable file.
938 # Updates to this dict are single-threaded.
939
940 all_heisenfiles = {}
941
942 def _reload():
943     global all_heisenfiles
944     all_heisenfiles = {}
945
946 class SFTPUserHandler(ConchUser, PrefixingLogMixin):
947     implements(ISFTPServer)
948     def __init__(self, client, rootnode, username):
949         ConchUser.__init__(self)
950         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=username)
951         if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
952
953         self.channelLookup["session"] = session.SSHSession
954         self.subsystemLookup["sftp"] = FileTransferServer
955
956         self._client = client
957         self._root = rootnode
958         self._username = username
959         self._convergence = client.convergence
960
961         # maps from UTF-8 paths for this user, to files written and still open
962         self._heisenfiles = {}
963
964     def gotVersion(self, otherVersion, extData):
965         self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
966
967         # advertise the same extensions as the OpenSSH SFTP server
968         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
969         return {'posix-rename@openssh.com': '1',
970                 'statvfs@openssh.com': '2',
971                 'fstatvfs@openssh.com': '2',
972                }
973
974     def logout(self):
975         self.log(".logout()", level=OPERATIONAL)
976
977         for files in self._heisenfiles.itervalues():
978             for f in files:
979                 f.abandon()
980
981     def _add_heisenfile_by_path(self, file):
982         self.log("._add_heisenfile_by_path(%r)" % (file,), level=OPERATIONAL)
983
984         userpath = file.get_userpath()
985         if userpath in self._heisenfiles:
986             self._heisenfiles[userpath] += [file]
987         else:
988             self._heisenfiles[userpath] = [file]
989
990     def _add_heisenfile_by_direntry(self, file):
991         self.log("._add_heisenfile_by_direntry(%r)" % (file,), level=OPERATIONAL)
992
993         direntry = file.get_direntry()
994         if direntry:
995             if direntry in all_heisenfiles:
996                 all_heisenfiles[direntry] += [file]
997             else:
998                 all_heisenfiles[direntry] = [file]
999
1000     def _abandon_any_heisenfiles(self, userpath, direntry):
1001         request = "._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry)
1002         self.log(request, level=OPERATIONAL)
1003
1004         assert isinstance(userpath, str), userpath
1005
1006         # First we synchronously mark all heisenfiles matching the userpath or direntry
1007         # as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
1008         # each file that we abandoned.
1009         #
1010         # For each file, the call to .abandon() occurs:
1011         #   * before the file is closed, in which case it will never be committed
1012         #     (uploaded+linked or published); or
1013         #   * after it is closed but before it has been close_notified, in which case the
1014         #     .sync() ensures that it has been committed (successfully or not) before we
1015         #     return.
1016         #
1017         # This avoids a race that might otherwise cause the file to be committed after
1018         # the remove operation has completed.
1019         #
1020         # We return a Deferred that fires with True if any files were abandoned (this
1021         # does not mean that they were not committed; it is used to determine whether
1022         # a NoSuchChildError from the attempt to delete the file should be suppressed).
1023
1024         files = []
1025         if direntry in all_heisenfiles:
1026             files = all_heisenfiles[direntry]
1027             del all_heisenfiles[direntry]
1028         if userpath in self._heisenfiles:
1029             files += self._heisenfiles[userpath]
1030             del self._heisenfiles[userpath]
1031
1032         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1033
1034         for f in files:
1035             f.abandon()
1036
1037         d = defer.succeed(None)
1038         for f in files:
1039             d.addBoth(f.sync)
1040
1041         def _done(ign):
1042             self.log("done %r" % (request,), level=OPERATIONAL)
1043             return len(files) > 0
1044         d.addBoth(_done)
1045         return d
1046
1047     def _rename_heisenfiles(self, from_userpath, from_parent, from_childname,
1048                             to_userpath, to_parent, to_childname, overwrite=True):
1049         request = ("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" %
1050                    (from_userpath, from_parent, from_childname, to_userpath, to_parent, to_childname, overwrite))
1051         self.log(request, level=OPERATIONAL)
1052
1053         assert (isinstance(from_userpath, str) and isinstance(from_childname, unicode) and
1054                 isinstance(to_userpath, str) and isinstance(to_childname, unicode)), \
1055                (from_userpath, from_childname, to_userpath, to_childname)
1056
1057         if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1058
1059         # First we synchronously rename all heisenfiles matching the userpath or direntry.
1060         # Then we .sync() each file that we renamed.
1061         #
1062         # For each file, the call to .rename occurs:
1063         #   * before the file is closed, in which case it will be committed at the
1064         #     new direntry; or
1065         #   * after it is closed but before it has been close_notified, in which case the
1066         #     .sync() ensures that it has been committed (successfully or not) before we
1067         #     return.
1068         #
1069         # This avoids a race that might otherwise cause the file to be committed at the
1070         # old name after the rename operation has completed.
1071         #
1072         # Note that if overwrite is False, the caller should already have checked
1073         # whether a real direntry exists at the destination. It is possible that another
1074         # direntry (heisen or real) comes to exist at the destination after that check,
1075         # but in that case it is correct for the rename to succeed (and for the commit
1076         # of the heisenfile at the destination to possibly clobber the other entry, since
1077         # that can happen anyway when we have concurrent write handles to the same direntry).
1078         #
1079         # We return a Deferred that fires with True if any files were renamed (this
1080         # does not mean that they were not committed; it is used to determine whether
1081         # a NoSuchChildError from the rename attempt should be suppressed). If overwrite
1082         # is False and there were already heisenfiles at the destination userpath or
1083         # direntry, we return a Deferred that fails with SFTPError(FX_PERMISSION_DENIED).
1084
1085         from_direntry = _direntry_for(from_parent, from_childname)
1086         to_direntry = _direntry_for(to_parent, to_childname)
1087
1088         if noisy: self.log("from_direntry = %r, to_direntry = %r, len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
1089                            (from_direntry, to_direntry, len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
1090
1091         if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
1092             def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1093             if noisy: self.log("existing", level=NOISY)
1094             return defer.execute(_existing)
1095
1096         from_files = []
1097         if from_direntry in all_heisenfiles:
1098             from_files = all_heisenfiles[from_direntry]
1099             del all_heisenfiles[from_direntry]
1100         if from_userpath in self._heisenfiles:
1101             from_files += self._heisenfiles[from_userpath]
1102             del self._heisenfiles[from_userpath]
1103
1104         if noisy: self.log("from_files = %r in %r" % (from_files, request), level=NOISY)
1105
1106         for f in from_files:
1107             f.rename(to_userpath, to_parent, to_childname)
1108             self._add_heisenfile_by_path(f)
1109             self._add_heisenfile_by_direntry(f)
1110
1111         d = defer.succeed(None)
1112         for f in from_files:
1113             d.addBoth(f.sync)
1114
1115         def _done(ign):
1116             if noisy: self.log("done: len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
1117                                (len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
1118             return len(from_files) > 0
1119         d.addBoth(_done)
1120         return d
1121
1122     def _update_attrs_for_heisenfiles(self, userpath, direntry, attrs):
1123         request = "._update_attrs_for_heisenfiles(%r, %r, %r)" % (userpath, direntry, attrs)
1124         self.log(request, level=OPERATIONAL)
1125
1126         assert isinstance(userpath, str) and isinstance(direntry, str), (userpath, direntry)
1127
1128         files = []
1129         if direntry in all_heisenfiles:
1130             files = all_heisenfiles[direntry]
1131         if userpath in self._heisenfiles:
1132             files += self._heisenfiles[userpath]
1133
1134         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1135
1136         # We set the metadata for all heisenfiles at this path or direntry.
1137         # Since a direntry includes a write URI, we must have authority to
1138         # change the metadata of heisenfiles found in the all_heisenfiles dict.
1139         # However that's not necessarily the case for heisenfiles found by
1140         # path. Therefore we tell the setAttrs method of each file to only
1141         # perform the update if the file is at the correct direntry.
1142
1143         d = defer.succeed(None)
1144         for f in files:
1145             d.addBoth(f.setAttrs, attrs, only_if_at=direntry)
1146
1147         def _done(ign):
1148             self.log("done %r" % (request,), level=OPERATIONAL)
1149             # TODO: this should not return True if only_if_at caused all files to be skipped.
1150             return len(files) > 0
1151         d.addBoth(_done)
1152         return d
1153
1154     def _sync_heisenfiles(self, userpath, direntry, ignore=None):
1155         request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
1156         self.log(request, level=OPERATIONAL)
1157
1158         assert isinstance(userpath, str) and isinstance(direntry, (str, NoneType)), (userpath, direntry)
1159
1160         files = []
1161         if direntry in all_heisenfiles:
1162             files = all_heisenfiles[direntry]
1163         if userpath in self._heisenfiles:
1164             files += self._heisenfiles[userpath]
1165
1166         if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
1167
1168         d = defer.succeed(None)
1169         for f in files:
1170             if f is not ignore:
1171                 d.addBoth(f.sync)
1172
1173         def _done(ign):
1174             self.log("done %r" % (request,), level=OPERATIONAL)
1175             return None
1176         d.addBoth(_done)
1177         return d
1178
1179     def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
1180         if noisy: self.log("._remove_heisenfile(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
1181
1182         assert isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)), (userpath, childname)
1183
1184         direntry = _direntry_for(parent, childname)
1185         if direntry in all_heisenfiles:
1186             all_old_files = all_heisenfiles[direntry]
1187             all_new_files = [f for f in all_old_files if f is not file_to_remove]
1188             if len(all_new_files) > 0:
1189                 all_heisenfiles[direntry] = all_new_files
1190             else:
1191                 del all_heisenfiles[direntry]
1192
1193         if userpath in self._heisenfiles:
1194             old_files = self._heisenfiles[userpath]
1195             new_files = [f for f in old_files if f is not file_to_remove]
1196             if len(new_files) > 0:
1197                 self._heisenfiles[userpath] = new_files
1198             else:
1199                 del self._heisenfiles[userpath]
1200
1201         if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
1202
1203     def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
1204         if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
1205                            (existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
1206                            level=NOISY)
1207
1208         assert (isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)) and
1209                 (metadata is None or 'no-write' in metadata)), (userpath, childname, metadata)
1210
1211         writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
1212         direntry = _direntry_for(parent, childname, filenode)
1213
1214         d = self._sync_heisenfiles(userpath, direntry, ignore=existing_file)
1215
1216         if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
1217             d.addCallback(lambda ign: ShortReadOnlySFTPFile(userpath, filenode, metadata))
1218         else:
1219             close_notify = None
1220             if writing:
1221                 close_notify = self._remove_heisenfile
1222
1223             d.addCallback(lambda ign: existing_file or GeneralSFTPFile(userpath, flags, close_notify, self._convergence))
1224             def _got_file(file):
1225                 file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
1226                 if writing:
1227                     self._add_heisenfile_by_direntry(file)
1228                 return file
1229             d.addCallback(_got_file)
1230         return d
1231
1232     def openFile(self, pathstring, flags, attrs, delay=None):
1233         request = ".openFile(%r, %r = %r, %r, delay=%r)" % (pathstring, flags, _repr_flags(flags), attrs, delay)
1234         self.log(request, level=OPERATIONAL)
1235
1236         # This is used for both reading and writing.
1237         # First exclude invalid combinations of flags, and empty paths.
1238
1239         if not (flags & (FXF_READ | FXF_WRITE)):
1240             def _bad_readwrite():
1241                 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
1242             return defer.execute(_bad_readwrite)
1243
1244         if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
1245             def _bad_exclcreat():
1246                 raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
1247             return defer.execute(_bad_exclcreat)
1248
1249         path = self._path_from_string(pathstring)
1250         if not path:
1251             def _emptypath(): raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
1252             return defer.execute(_emptypath)
1253
1254         # The combination of flags is potentially valid.
1255
1256         # To work around clients that have race condition bugs, a getAttr, rename, or
1257         # remove request following an 'open' request with FXF_WRITE or FXF_CREAT flags,
1258         # should succeed even if the 'open' request has not yet completed. So we now
1259         # synchronously add a file object into the self._heisenfiles dict, indexed
1260         # by its UTF-8 userpath. (We can't yet add it to the all_heisenfiles dict,
1261         # because we don't yet have a user-independent path for the file.) The file
1262         # object does not know its filenode, parent, or childname at this point.
1263
1264         userpath = self._path_to_utf8(path)
1265
1266         if flags & (FXF_WRITE | FXF_CREAT):
1267             file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
1268             self._add_heisenfile_by_path(file)
1269         else:
1270             # We haven't decided which file implementation to use yet.
1271             file = None
1272
1273         desired_metadata = _attrs_to_metadata(attrs)
1274
1275         # Now there are two major cases:
1276         #
1277         #  1. The path is specified as /uri/FILECAP, with no parent directory.
1278         #     If the FILECAP is mutable and writeable, then we can open it in write-only
1279         #     or read/write mode (non-exclusively), otherwise we can only open it in
1280         #     read-only mode. The open should succeed immediately as long as FILECAP is
1281         #     a valid known filecap that grants the required permission.
1282         #
1283         #  2. The path is specified relative to a parent. We find the parent dirnode and
1284         #     get the child's URI and metadata if it exists. There are four subcases:
1285         #       a. the child does not exist: FXF_CREAT must be set, and we must be able
1286         #          to write to the parent directory.
1287         #       b. the child exists but is not a valid known filecap: fail
1288         #       c. the child is mutable: if we are trying to open it write-only or
1289         #          read/write, then we must be able to write to the file.
1290         #       d. the child is immutable: if we are trying to open it write-only or
1291         #          read/write, then we must be able to write to the parent directory.
1292         #
1293         # To reduce latency, open normally succeeds as soon as these conditions are
1294         # met, even though there might be a failure in downloading the existing file
1295         # or uploading a new one. However, there is an exception: if a file has been
1296         # written, then closed, and is now being reopened, then we have to delay the
1297         # open until the previous upload/publish has completed. This is necessary
1298         # because sshfs does not wait for the result of an FXF_CLOSE message before
1299         # reporting to the client that a file has been closed. It applies both to
1300         # mutable files, and to directory entries linked to an immutable file.
1301         #
1302         # Note that the permission checks below are for more precise error reporting on
1303         # the open call; later operations would fail even if we did not make these checks.
1304
1305         d = delay or defer.succeed(None)
1306         d.addCallback(lambda ign: self._get_root(path))
1307         def _got_root( (root, path) ):
1308             if root.is_unknown():
1309                 raise SFTPError(FX_PERMISSION_DENIED,
1310                                 "cannot open an unknown cap (or child of an unknown object). "
1311                                 "Upgrading the gateway to a later Tahoe-LAFS version may help")
1312             if not path:
1313                 # case 1
1314                 if noisy: self.log("case 1: root = %r, path[:-1] = %r" % (root, path[:-1]), level=NOISY)
1315                 if not IFileNode.providedBy(root):
1316                     raise SFTPError(FX_PERMISSION_DENIED,
1317                                     "cannot open a directory cap")
1318                 if (flags & FXF_WRITE) and root.is_readonly():
1319                     raise SFTPError(FX_PERMISSION_DENIED,
1320                                     "cannot write to a non-writeable filecap without a parent directory")
1321                 if flags & FXF_EXCL:
1322                     raise SFTPError(FX_FAILURE,
1323                                     "cannot create a file exclusively when it already exists")
1324
1325                 # The file does not need to be added to all_heisenfiles, because it is not
1326                 # associated with a directory entry that needs to be updated.
1327
1328                 metadata = update_metadata(None, desired_metadata, time())
1329
1330                 # We have to decide what to pass for the 'parent_readonly' argument to _no_write,
1331                 # given that we don't actually have a parent. This only affects the permissions
1332                 # reported by a getAttrs on this file handle in the case of an immutable file.
1333                 # We choose 'parent_readonly=True' since that will cause the permissions to be
1334                 # reported as r--r--r--, which is appropriate because an immutable file can't be
1335                 # written via this path.
1336
1337                 metadata['no-write'] = _no_write(True, root)
1338                 return self._make_file(file, userpath, flags, filenode=root, metadata=metadata)
1339             else:
1340                 # case 2
1341                 childname = path[-1]
1342
1343                 if noisy: self.log("case 2: root = %r, childname = %r, desired_metadata = %r, path[:-1] = %r" %
1344                                    (root, childname, desired_metadata, path[:-1]), level=NOISY)
1345                 d2 = root.get_child_at_path(path[:-1])
1346                 def _got_parent(parent):
1347                     if noisy: self.log("_got_parent(%r)" % (parent,), level=NOISY)
1348                     if parent.is_unknown():
1349                         raise SFTPError(FX_PERMISSION_DENIED,
1350                                         "cannot open a child of an unknown object. "
1351                                         "Upgrading the gateway to a later Tahoe-LAFS version may help")
1352
1353                     parent_readonly = parent.is_readonly()
1354                     d3 = defer.succeed(None)
1355                     if flags & FXF_EXCL:
1356                         # FXF_EXCL means that the link to the file (not the file itself) must
1357                         # be created atomically wrt updates by this storage client.
1358                         # That is, we need to create the link before returning success to the
1359                         # SFTP open request (and not just on close, as would normally be the
1360                         # case). We make the link initially point to a zero-length LIT file,
1361                         # which is consistent with what might happen on a POSIX filesystem.
1362
1363                         if parent_readonly:
1364                             raise SFTPError(FX_FAILURE,
1365                                             "cannot create a file exclusively when the parent directory is read-only")
1366
1367                         # 'overwrite=False' ensures failure if the link already exists.
1368                         # FIXME: should use a single call to set_uri and return (child, metadata) (#1035)
1369
1370                         zero_length_lit = "URI:LIT:"
1371                         if noisy: self.log("%r.set_uri(%r, None, readcap=%r, overwrite=False)" %
1372                                            (parent, zero_length_lit, childname), level=NOISY)
1373                         d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit,
1374                                                                   metadata=desired_metadata, overwrite=False))
1375                         def _seturi_done(child):
1376                             if noisy: self.log("%r.get_metadata_for(%r)" % (parent, childname), level=NOISY)
1377                             d4 = parent.get_metadata_for(childname)
1378                             d4.addCallback(lambda metadata: (child, metadata))
1379                             return d4
1380                         d3.addCallback(_seturi_done)
1381                     else:
1382                         if noisy: self.log("%r.get_child_and_metadata(%r)" % (parent, childname), level=NOISY)
1383                         d3.addCallback(lambda ign: parent.get_child_and_metadata(childname))
1384
1385                     def _got_child( (filenode, current_metadata) ):
1386                         if noisy: self.log("_got_child( (%r, %r) )" % (filenode, current_metadata), level=NOISY)
1387
1388                         metadata = update_metadata(current_metadata, desired_metadata, time())
1389
1390                         # Ignore the permissions of the desired_metadata in an open call. The permissions
1391                         # can only be set by setAttrs.
1392                         metadata['no-write'] = _no_write(parent_readonly, filenode, current_metadata)
1393
1394                         if filenode.is_unknown():
1395                             raise SFTPError(FX_PERMISSION_DENIED,
1396                                             "cannot open an unknown cap. Upgrading the gateway "
1397                                             "to a later Tahoe-LAFS version may help")
1398                         if not IFileNode.providedBy(filenode):
1399                             raise SFTPError(FX_PERMISSION_DENIED,
1400                                             "cannot open a directory as if it were a file")
1401                         if (flags & FXF_WRITE) and metadata['no-write']:
1402                             raise SFTPError(FX_PERMISSION_DENIED,
1403                                             "cannot open a non-writeable file for writing")
1404
1405                         return self._make_file(file, userpath, flags, parent=parent, childname=childname,
1406                                                filenode=filenode, metadata=metadata)
1407                     def _no_child(f):
1408                         if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
1409                         f.trap(NoSuchChildError)
1410
1411                         if not (flags & FXF_CREAT):
1412                             raise SFTPError(FX_NO_SUCH_FILE,
1413                                             "the file does not exist, and was not opened with the creation (CREAT) flag")
1414                         if parent_readonly:
1415                             raise SFTPError(FX_PERMISSION_DENIED,
1416                                             "cannot create a file when the parent directory is read-only")
1417
1418                         return self._make_file(file, userpath, flags, parent=parent, childname=childname)
1419                     d3.addCallbacks(_got_child, _no_child)
1420                     return d3
1421
1422                 d2.addCallback(_got_parent)
1423                 return d2
1424
1425         d.addCallback(_got_root)
1426         def _remove_on_error(err):
1427             if file:
1428                 self._remove_heisenfile(userpath, None, None, file)
1429             return err
1430         d.addErrback(_remove_on_error)
1431         d.addBoth(_convert_error, request)
1432         return d
1433
1434     def renameFile(self, from_pathstring, to_pathstring, overwrite=False):
1435         request = ".renameFile(%r, %r)" % (from_pathstring, to_pathstring)
1436         self.log(request, level=OPERATIONAL)
1437
1438         from_path = self._path_from_string(from_pathstring)
1439         to_path = self._path_from_string(to_pathstring)
1440         from_userpath = self._path_to_utf8(from_path)
1441         to_userpath = self._path_to_utf8(to_path)
1442
1443         # the target directory must already exist
1444         d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
1445                                         self._get_parent_or_node(to_path)])
1446         def _got( (from_pair, to_pair) ):
1447             if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r, overwrite=%r)" %
1448                                (from_pair, to_pair, from_pathstring, to_pathstring, overwrite), level=NOISY)
1449             (from_parent, from_childname) = from_pair
1450             (to_parent, to_childname) = to_pair
1451
1452             if from_childname is None:
1453                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename a source object specified by URI")
1454             if to_childname is None:
1455                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename to a destination specified by URI")
1456
1457             # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
1458             # "It is an error if there already exists a file with the name specified
1459             #  by newpath."
1460             # OpenSSH's SFTP server returns FX_PERMISSION_DENIED for this error.
1461             #
1462             # For the standard SSH_FXP_RENAME operation, overwrite=False.
1463             # We also support the posix-rename@openssh.com extension, which uses overwrite=True.
1464
1465             d2 = defer.succeed(None)
1466             if not overwrite:
1467                 d2.addCallback(lambda ign: to_parent.get(to_childname))
1468                 def _expect_fail(res):
1469                     if not isinstance(res, Failure):
1470                         raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1471
1472                     # It is OK if we fail for errors other than NoSuchChildError, since that probably
1473                     # indicates some problem accessing the destination directory.
1474                     res.trap(NoSuchChildError)
1475                 d2.addBoth(_expect_fail)
1476
1477             # If there are heisenfiles to be written at the 'from' direntry, then ensure
1478             # they will now be written at the 'to' direntry instead.
1479             d2.addCallback(lambda ign:
1480                            self._rename_heisenfiles(from_userpath, from_parent, from_childname,
1481                                                     to_userpath, to_parent, to_childname, overwrite=overwrite))
1482
1483             def _move(renamed):
1484                 # FIXME: use move_child_to_path to avoid possible data loss due to #943
1485                 #d3 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
1486
1487                 d3 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
1488                 def _check(err):
1489                     if noisy: self.log("_check(%r) in .renameFile(%r, %r, overwrite=%r)" %
1490                                        (err, from_pathstring, to_pathstring, overwrite), level=NOISY)
1491
1492                     if not isinstance(err, Failure) or (renamed and err.check(NoSuchChildError)):
1493                         return None
1494                     if not overwrite and err.check(ExistingChildError):
1495                         raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
1496
1497                     return err
1498                 d3.addBoth(_check)
1499                 return d3
1500             d2.addCallback(_move)
1501             return d2
1502         d.addCallback(_got)
1503         d.addBoth(_convert_error, request)
1504         return d
1505
1506     def makeDirectory(self, pathstring, attrs):
1507         request = ".makeDirectory(%r, %r)" % (pathstring, attrs)
1508         self.log(request, level=OPERATIONAL)
1509
1510         path = self._path_from_string(pathstring)
1511         metadata = _attrs_to_metadata(attrs)
1512         if 'no-write' in metadata:
1513             def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "cannot create a directory that is initially read-only")
1514             return defer.execute(_denied)
1515
1516         d = self._get_root(path)
1517         d.addCallback(lambda (root, path):
1518                       self._get_or_create_directories(root, path, metadata))
1519         d.addBoth(_convert_error, request)
1520         return d
1521
1522     def _get_or_create_directories(self, node, path, metadata):
1523         if not IDirectoryNode.providedBy(node):
1524             # TODO: provide the name of the blocking file in the error message.
1525             def _blocked(): raise SFTPError(FX_FAILURE, "cannot create directory because there "
1526                                                         "is a file in the way") # close enough
1527             return defer.execute(_blocked)
1528
1529         if not path:
1530             return defer.succeed(node)
1531         d = node.get(path[0])
1532         def _maybe_create(f):
1533             f.trap(NoSuchChildError)
1534             return node.create_subdirectory(path[0])
1535         d.addErrback(_maybe_create)
1536         d.addCallback(self._get_or_create_directories, path[1:], metadata)
1537         return d
1538
1539     def removeFile(self, pathstring):
1540         request = ".removeFile(%r)" % (pathstring,)
1541         self.log(request, level=OPERATIONAL)
1542
1543         path = self._path_from_string(pathstring)
1544         d = self._remove_object(path, must_be_file=True)
1545         d.addBoth(_convert_error, request)
1546         return d
1547
1548     def removeDirectory(self, pathstring):
1549         request = ".removeDirectory(%r)" % (pathstring,)
1550         self.log(request, level=OPERATIONAL)
1551
1552         path = self._path_from_string(pathstring)
1553         d = self._remove_object(path, must_be_directory=True)
1554         d.addBoth(_convert_error, request)
1555         return d
1556
1557     def _remove_object(self, path, must_be_directory=False, must_be_file=False):
1558         userpath = self._path_to_utf8(path)
1559         d = self._get_parent_or_node(path)
1560         def _got_parent( (parent, childname) ):
1561             if childname is None:
1562                 raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
1563
1564             direntry = _direntry_for(parent, childname)
1565             d2 = defer.succeed(False)
1566             if not must_be_directory:
1567                 d2.addCallback(lambda ign: self._abandon_any_heisenfiles(userpath, direntry))
1568
1569             d2.addCallback(lambda abandoned:
1570                            parent.delete(childname, must_exist=not abandoned,
1571                                          must_be_directory=must_be_directory, must_be_file=must_be_file))
1572             return d2
1573         d.addCallback(_got_parent)
1574         return d
1575
1576     def openDirectory(self, pathstring):
1577         request = ".openDirectory(%r)" % (pathstring,)
1578         self.log(request, level=OPERATIONAL)
1579
1580         path = self._path_from_string(pathstring)
1581         d = self._get_parent_or_node(path)
1582         def _got_parent_or_node( (parent_or_node, childname) ):
1583             if noisy: self.log("_got_parent_or_node( (%r, %r) ) in openDirectory(%r)" %
1584                                (parent_or_node, childname, pathstring), level=NOISY)
1585             if childname is None:
1586                 return parent_or_node
1587             else:
1588                 return parent_or_node.get(childname)
1589         d.addCallback(_got_parent_or_node)
1590         def _list(dirnode):
1591             if dirnode.is_unknown():
1592                 raise SFTPError(FX_PERMISSION_DENIED,
1593                                 "cannot list an unknown cap as a directory. Upgrading the gateway "
1594                                 "to a later Tahoe-LAFS version may help")
1595             if not IDirectoryNode.providedBy(dirnode):
1596                 raise SFTPError(FX_PERMISSION_DENIED,
1597                                 "cannot list a file as if it were a directory")
1598
1599             d2 = dirnode.list()
1600             def _render(children):
1601                 parent_readonly = dirnode.is_readonly()
1602                 results = []
1603                 for filename, (child, metadata) in children.iteritems():
1604                     # The file size may be cached or absent.
1605                     metadata['no-write'] = _no_write(parent_readonly, child, metadata)
1606                     attrs = _populate_attrs(child, metadata)
1607                     filename_utf8 = filename.encode('utf-8')
1608                     longname = _lsLine(filename_utf8, attrs)
1609                     results.append( (filename_utf8, longname, attrs) )
1610                 return StoppableList(results)
1611             d2.addCallback(_render)
1612             return d2
1613         d.addCallback(_list)
1614         d.addBoth(_convert_error, request)
1615         return d
1616
1617     def getAttrs(self, pathstring, followLinks):
1618         request = ".getAttrs(%r, followLinks=%r)" % (pathstring, followLinks)
1619         self.log(request, level=OPERATIONAL)
1620
1621         # When asked about a specific file, report its current size.
1622         # TODO: the modification time for a mutable file should be
1623         # reported as the update time of the best version. But that
1624         # information isn't currently stored in mutable shares, I think.
1625
1626         path = self._path_from_string(pathstring)
1627         userpath = self._path_to_utf8(path)
1628         d = self._get_parent_or_node(path)
1629         def _got_parent_or_node( (parent_or_node, childname) ):
1630             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1631
1632             # Some clients will incorrectly try to get the attributes
1633             # of a file immediately after opening it, before it has been put
1634             # into the all_heisenfiles table. This is a race condition bug in
1635             # the client, but we handle it anyway by calling .sync() on all
1636             # files matching either the path or the direntry.
1637
1638             direntry = _direntry_for(parent_or_node, childname)
1639             d2 = self._sync_heisenfiles(userpath, direntry)
1640
1641             if childname is None:
1642                 node = parent_or_node
1643                 d2.addCallback(lambda ign: node.get_current_size())
1644                 d2.addCallback(lambda size:
1645                                _populate_attrs(node, {'no-write': node.is_unknown() or node.is_readonly()}, size=size))
1646             else:
1647                 parent = parent_or_node
1648                 d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname]))
1649                 def _got( (child, metadata) ):
1650                     if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
1651                     assert IDirectoryNode.providedBy(parent), parent
1652                     metadata['no-write'] = _no_write(parent.is_readonly(), child, metadata)
1653                     d3 = child.get_current_size()
1654                     d3.addCallback(lambda size: _populate_attrs(child, metadata, size=size))
1655                     return d3
1656                 def _nosuch(err):
1657                     if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
1658                     err.trap(NoSuchChildError)
1659                     if noisy: self.log("checking open files:\nself._heisenfiles = %r\nall_heisenfiles = %r\ndirentry=%r" %
1660                                        (self._heisenfiles, all_heisenfiles, direntry), level=NOISY)
1661                     if direntry in all_heisenfiles:
1662                         files = all_heisenfiles[direntry]
1663                         if len(files) == 0:  # pragma: no cover
1664                             return err
1665                         # use the heisenfile that was most recently opened
1666                         return files[-1].getAttrs()
1667                     return err
1668                 d2.addCallbacks(_got, _nosuch)
1669             return d2
1670         d.addCallback(_got_parent_or_node)
1671         d.addBoth(_convert_error, request)
1672         return d
1673
1674     def setAttrs(self, pathstring, attrs):
1675         request = ".setAttrs(%r, %r)" % (pathstring, attrs)
1676         self.log(request, level=OPERATIONAL)
1677
1678         if "size" in attrs:
1679             # this would require us to download and re-upload the truncated/extended
1680             # file contents
1681             def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute unsupported")
1682             return defer.execute(_unsupported)
1683
1684         path = self._path_from_string(pathstring)
1685         userpath = self._path_to_utf8(path)
1686         d = self._get_parent_or_node(path)
1687         def _got_parent_or_node( (parent_or_node, childname) ):
1688             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
1689
1690             direntry = _direntry_for(parent_or_node, childname)
1691             d2 = self._update_attrs_for_heisenfiles(userpath, direntry, attrs)
1692
1693             def _update(updated_heisenfiles):
1694                 if childname is None:
1695                     if updated_heisenfiles:
1696                         return None
1697                     raise SFTPError(FX_NO_SUCH_FILE, userpath)
1698                 else:
1699                     desired_metadata = _attrs_to_metadata(attrs)
1700                     if noisy: self.log("desired_metadata = %r" % (desired_metadata,), level=NOISY)
1701
1702                     d3 = parent_or_node.set_metadata_for(childname, desired_metadata)
1703                     def _nosuch(err):
1704                         if updated_heisenfiles:
1705                             err.trap(NoSuchChildError)
1706                         else:
1707                             return err
1708                     d3.addErrback(_nosuch)
1709                     return d3
1710             d2.addCallback(_update)
1711             d2.addCallback(lambda ign: None)
1712             return d2
1713         d.addCallback(_got_parent_or_node)
1714         d.addBoth(_convert_error, request)
1715         return d
1716
1717     def readLink(self, pathstring):
1718         self.log(".readLink(%r)" % (pathstring,), level=OPERATIONAL)
1719
1720         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "readLink")
1721         return defer.execute(_unsupported)
1722
1723     def makeLink(self, linkPathstring, targetPathstring):
1724         self.log(".makeLink(%r, %r)" % (linkPathstring, targetPathstring), level=OPERATIONAL)
1725
1726         # If this is implemented, note the reversal of arguments described in point 7 of
1727         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>.
1728
1729         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "makeLink")
1730         return defer.execute(_unsupported)
1731
1732     def extendedRequest(self, extensionName, extensionData):
1733         self.log(".extendedRequest(%r, <data of length %r>)" % (extensionName, len(extensionData)), level=OPERATIONAL)
1734
1735         # We implement the three main OpenSSH SFTP extensions; see
1736         # <http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL?rev=1.15>
1737
1738         if extensionName == 'posix-rename@openssh.com':
1739             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "could not parse posix-rename@openssh.com request")
1740
1741             if 4 > len(extensionData): return defer.execute(_bad)
1742             (fromPathLen,) = struct.unpack('>L', extensionData[0:4])
1743             if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
1744
1745             (toPathLen,) = struct.unpack('>L', extensionData[(4 + fromPathLen):(8 + fromPathLen)])
1746             if 8 + fromPathLen + toPathLen != len(extensionData): return defer.execute(_bad)
1747
1748             fromPathstring = extensionData[4:(4 + fromPathLen)]
1749             toPathstring = extensionData[(8 + fromPathLen):]
1750             d = self.renameFile(fromPathstring, toPathstring, overwrite=True)
1751
1752             # Twisted conch assumes that the response from an extended request is either
1753             # an error, or an FXP_EXTENDED_REPLY. But it happens to do the right thing
1754             # (respond with an FXP_STATUS message) if we return a Failure with code FX_OK.
1755             def _succeeded(ign):
1756                 raise SFTPError(FX_OK, "request succeeded")
1757             d.addCallback(_succeeded)
1758             return d
1759
1760         if extensionName == 'statvfs@openssh.com' or extensionName == 'fstatvfs@openssh.com':
1761             # f_bsize and f_frsize should be the same to avoid a bug in 'df'
1762             return defer.succeed(struct.pack('>11Q',
1763                 1024,         # uint64  f_bsize     /* file system block size */
1764                 1024,         # uint64  f_frsize    /* fundamental fs block size */
1765                 628318530,    # uint64  f_blocks    /* number of blocks (unit f_frsize) */
1766                 314159265,    # uint64  f_bfree     /* free blocks in file system */
1767                 314159265,    # uint64  f_bavail    /* free blocks for non-root */
1768                 200000000,    # uint64  f_files     /* total file inodes */
1769                 100000000,    # uint64  f_ffree     /* free file inodes */
1770                 100000000,    # uint64  f_favail    /* free file inodes for non-root */
1771                 0x1AF5,       # uint64  f_fsid      /* file system id */
1772                 2,            # uint64  f_flag      /* bit mask = ST_NOSUID; not ST_RDONLY */
1773                 65535,        # uint64  f_namemax   /* maximum filename length */
1774                 ))
1775
1776         def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "unsupported %r request <data of length %r>" %
1777                                                                (extensionName, len(extensionData)))
1778         return defer.execute(_unsupported)
1779
1780     def realPath(self, pathstring):
1781         self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
1782
1783         return self._path_to_utf8(self._path_from_string(pathstring))
1784
1785     def _path_to_utf8(self, path):
1786         return (u"/" + u"/".join(path)).encode('utf-8')
1787
1788     def _path_from_string(self, pathstring):
1789         if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
1790
1791         assert isinstance(pathstring, str), pathstring
1792
1793         # The home directory is the root directory.
1794         pathstring = pathstring.strip("/")
1795         if pathstring == "" or pathstring == ".":
1796             path_utf8 = []
1797         else:
1798             path_utf8 = pathstring.split("/")
1799
1800         # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.2>
1801         # "Servers SHOULD interpret a path name component ".." as referring to
1802         #  the parent directory, and "." as referring to the current directory."
1803         path = []
1804         for p_utf8 in path_utf8:
1805             if p_utf8 == "..":
1806                 # ignore excess .. components at the root
1807                 if len(path) > 0:
1808                     path = path[:-1]
1809             elif p_utf8 != ".":
1810                 try:
1811                     p = p_utf8.decode('utf-8', 'strict')
1812                 except UnicodeError:
1813                     raise SFTPError(FX_NO_SUCH_FILE, "path could not be decoded as UTF-8")
1814                 path.append(p)
1815
1816         if noisy: self.log(" PATH %r" % (path,), level=NOISY)
1817         return path
1818
1819     def _get_root(self, path):
1820         # return Deferred (root, remaining_path)
1821         d = defer.succeed(None)
1822         if path and path[0] == u"uri":
1823             d.addCallback(lambda ign: self._client.create_node_from_uri(path[1].encode('utf-8')))
1824             d.addCallback(lambda root: (root, path[2:]))
1825         else:
1826             d.addCallback(lambda ign: (self._root, path))
1827         return d
1828
1829     def _get_parent_or_node(self, path):
1830         # return Deferred (parent, childname) or (node, None)
1831         d = self._get_root(path)
1832         def _got_root( (root, remaining_path) ):
1833             if not remaining_path:
1834                 return (root, None)
1835             else:
1836                 d2 = root.get_child_at_path(remaining_path[:-1])
1837                 d2.addCallback(lambda parent: (parent, remaining_path[-1]))
1838                 return d2
1839         d.addCallback(_got_root)
1840         return d
1841
1842
1843 class FakeTransport:
1844     implements(ITransport)
1845     def write(self, data):
1846         logmsg("FakeTransport.write(<data of length %r>)" % (len(data),), level=NOISY)
1847
1848     def writeSequence(self, data):
1849         logmsg("FakeTransport.writeSequence(...)", level=NOISY)
1850
1851     def loseConnection(self):
1852         logmsg("FakeTransport.loseConnection()", level=NOISY)
1853
1854     # getPeer and getHost can just raise errors, since we don't know what to return
1855
1856
1857 class ShellSession(PrefixingLogMixin):
1858     implements(ISession)
1859     def __init__(self, userHandler):
1860         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
1861         if noisy: self.log(".__init__(%r)" % (userHandler), level=NOISY)
1862
1863     def getPty(self, terminal, windowSize, attrs):
1864         self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
1865
1866     def openShell(self, protocol):
1867         self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
1868         if hasattr(protocol, 'transport') and protocol.transport is None:
1869             protocol.transport = FakeTransport()  # work around Twisted bug
1870
1871         return self._unsupported(protocol)
1872
1873     def execCommand(self, protocol, cmd):
1874         self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
1875         if hasattr(protocol, 'transport') and protocol.transport is None:
1876             protocol.transport = FakeTransport()  # work around Twisted bug
1877
1878         d = defer.succeed(None)
1879         if cmd == "df -P -k /":
1880             d.addCallback(lambda ign: protocol.write(
1881                           "Filesystem         1024-blocks      Used Available Capacity Mounted on\r\n"
1882                           "tahoe                628318530 314159265 314159265      50% /\r\n"))
1883             d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessDone(None))))
1884         else:
1885             d.addCallback(lambda ign: self._unsupported(protocol))
1886         return d
1887
1888     def _unsupported(self, protocol):
1889         d = defer.succeed(None)
1890         d.addCallback(lambda ign: protocol.errReceived(
1891                       "This server supports only the SFTP protocol. It does not support SCP,\r\n"
1892                       "interactive shell sessions, or commands other than one needed by sshfs.\r\n"))
1893         d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
1894         return d
1895
1896     def windowChanged(self, newWindowSize):
1897         self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
1898
1899     def eofReceived(self):
1900         self.log(".eofReceived()", level=OPERATIONAL)
1901
1902     def closed(self):
1903         self.log(".closed()", level=OPERATIONAL)
1904
1905
1906 # If you have an SFTPUserHandler and want something that provides ISession, you get
1907 # ShellSession(userHandler).
1908 # We use adaptation because this must be a different object to the SFTPUserHandler.
1909 components.registerAdapter(ShellSession, SFTPUserHandler, ISession)
1910
1911
1912 from allmydata.frontends.auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
1913
1914 class Dispatcher:
1915     implements(portal.IRealm)
1916     def __init__(self, client):
1917         self._client = client
1918
1919     def requestAvatar(self, avatarID, mind, interface):
1920         assert interface == IConchUser, interface
1921         rootnode = self._client.create_node_from_uri(avatarID.rootcap)
1922         handler = SFTPUserHandler(self._client, rootnode, avatarID.username)
1923         return (interface, handler, handler.logout)
1924
1925
1926 class SFTPServer(service.MultiService):
1927     def __init__(self, client, accountfile, accounturl,
1928                  sftp_portstr, pubkey_file, privkey_file):
1929         service.MultiService.__init__(self)
1930
1931         r = Dispatcher(client)
1932         p = portal.Portal(r)
1933
1934         if accountfile:
1935             c = AccountFileChecker(self, accountfile)
1936             p.registerChecker(c)
1937         if accounturl:
1938             c = AccountURLChecker(self, accounturl)
1939             p.registerChecker(c)
1940         if not accountfile and not accounturl:
1941             # we could leave this anonymous, with just the /uri/CAP form
1942             raise NeedRootcapLookupScheme("must provide an account file or URL")
1943
1944         pubkey = keys.Key.fromFile(pubkey_file)
1945         privkey = keys.Key.fromFile(privkey_file)
1946         class SSHFactory(factory.SSHFactory):
1947             publicKeys = {pubkey.sshType(): pubkey}
1948             privateKeys = {privkey.sshType(): privkey}
1949             def getPrimes(self):
1950                 try:
1951                     # if present, this enables diffie-hellman-group-exchange
1952                     return primes.parseModuliFile("/etc/ssh/moduli")
1953                 except IOError:
1954                     return None
1955
1956         f = SSHFactory()
1957         f.portal = p
1958
1959         s = strports.service(sftp_portstr, f)
1960         s.setServiceParent(self)