def __init__(self, check_abort, download_size, tempfile_maker):
PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
+ if noisy: self.log(".__init__(%r, %r, %r)" % (check_abort, download_size, tempfile_maker), level=NOISY)
self.check_abort = check_abort
self.download_size = download_size
self.current_size = download_size
self.milestones = [] # empty heap of (offset, d)
self.overwrites = [] # empty heap of (start, end)
self.done = self.when_reached(download_size) # adds a milestone
+ self.is_done = False
+ def _signal_done(ign):
+ if noisy: self.log("DONE", level=NOISY)
+ self.is_done = True
+ self.done.addCallback(_signal_done)
self.producer = None
def get_file(self):
self.finish()
def registerProducer(self, p, streaming):
+ if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
self.producer = p
if streaming:
# call resumeProducing once to start things off
p.resumeProducing()
else:
- while not self.done:
- p.resumeProducing()
+ def _iterate():
+ if not self.is_done:
+ p.resumeProducing()
+ eventually(_iterate)
+ _iterate()
def write(self, data):
if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
if offset + length > self.current_size:
length = self.current_size - offset
+ if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
needed = min(offset + length, self.download_size)
d = self.when_reached(needed)
SIZE_THRESHOLD = 1000
def _make_sftp_file(check_abort, flags, convergence, parent=None, childname=None, filenode=None, metadata=None):
- if noisy: logmsg("_make_sftp_file(%r, %r, %r, parent=%r, childname=%r, filenode=%r, metadata=%r" %
- (check_abort, flags, convergence, parent, childname, filenode, metadata), level=NOISY)
+ if noisy: logmsg("_make_sftp_file(%r, %r, <convergence censored>, parent=%r, childname=%r, filenode=%r, metadata=%r" %
+ (check_abort, flags, parent, childname, filenode, metadata), level=NOISY)
if not (flags & (FXF_WRITE | FXF_CREAT)) and (flags & FXF_READ) and filenode and \
not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
def __init__(self, check_abort, flags, convergence, parent=None, childname=None, filenode=None, metadata=None):
PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
- if noisy: self.log(".__init__(%r, %r, %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
- (check_abort, flags, convergence, parent, childname, filenode, metadata), level=NOISY)
+ if noisy: self.log(".__init__(%r, %r, <convergence censored>, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
+ (check_abort, flags, parent, childname, filenode, metadata), level=NOISY)
self.check_abort = check_abort
self.flags = flags
download_size = filenode.get_size()
assert download_size is not None
self.consumer = OverwriteableFileConsumer(self.check_abort, download_size, tempfile_maker)
- self.async.addCallback(lambda ign: filenode.read(self.consumer, 0, None))
+ def _read(ign):
+ if noisy: self.log("_read immutable", level=NOISY)
+ filenode.read(self.consumer, 0, None)
+ self.async.addCallback(_read)
def readChunk(self, offset, length):
self.log(".readChunk(%r, %r)" % (offset, length), level=OPERATIONAL)
# and an error occurs while flushing cached writes during the close."
def _write(ign):
+ if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
+ (offset, len(data), self.consumer.get_current_size()), level=NOISY)
# FXF_APPEND means that we should always write at the current end of file.
write_offset = offset
if self.flags & FXF_APPEND:
from twisted.conch.ssh import filetransfer as sftp
from allmydata.frontends import sftpd
-# FIXME remove this
-#from twisted.internet.base import DelayedCall
-#DelayedCall.debug = True
-
import traceback
"""
d.addCallback(lambda node: download_to_data(node))
d.addCallback(lambda data: self.failUnlessReallyEqual(data, "0123456789"))
- """
# test that writing a zero-length file with EXCL only updates the directory once
d.addCallback(lambda ign:
self.handler.openFile("zerolength", sftp.FXF_WRITE | sftp.FXF_CREAT |
d2.addCallback(lambda node: download_to_data(node))
d2.addCallback(lambda data: self.failUnlessReallyEqual(data, ""))
- d2 = wf.writeChunk(10, "0123456789")
- d2.addCallback(wf.writeChunk(5, "01234"))
+ d2.addCallback(lambda ign: wf.writeChunk(10, "0123456789"))
+ d2.addCallback(lambda ign: wf.writeChunk(5, "01234"))
d2.addCallback(lambda ign: wf.close())
return d2
d.addCallback(_write_excl_append)
d.addCallback(lambda ign: self.root.get(u"exclappend"))
d.addCallback(lambda node: download_to_data(node))
d.addCallback(lambda data: self.failUnlessReallyEqual(data, "012345678901234"))
- """
# test WRITE | CREAT without TRUNC
d.addCallback(lambda ign:
d.addCallback(_check_same_file)
d.addCallback(lambda data: self.failUnlessReallyEqual(data, "mutable new! contents"))
- """
# test READ | WRITE without CREAT or TRUNC
d.addCallback(lambda ign:
self.handler.openFile("small", sftp.FXF_READ | sftp.FXF_WRITE, {}))
def _read_write(rwf):
- d2 = rwf.writeChunk(8, "0123")
+ d2 = rwf.writeChunk(8, "0123")
d2.addCallback(lambda ign: rwf.readChunk(0, 100))
d2.addCallback(lambda data: self.failUnlessReallyEqual(data, "012345670123"))
d2.addCallback(lambda ign: rwf.close())
d.addCallback(lambda ign: self.root.get(u"small"))
d.addCallback(lambda node: download_to_data(node))
d.addCallback(lambda data: self.failUnlessReallyEqual(data, "012345670123"))
- """
+
return d
def test_removeFile(self):