class CountingDataUploadable(upload.Data):
bytes_read = 0
+ interrupt_after = None
+ interrupt_after_d = None
+
def read(self, length):
self.bytes_read += length
+ if self.interrupt_after is not None:
+ if self.bytes_read > self.interrupt_after:
+ self.interrupt_after = None
+ self.interrupt_after_d.callback(self)
return upload.Data.read(self, length)
d.addCallback(_connected)
return d
+ def bounce_client(self, num):
+ c = self.clients[num]
+ d = c.disownServiceParent()
+ def _stopped(res):
+ new_c = client.Client(basedir=self.getdir("client%d" % num))
+ self.clients[num] = new_c
+ self.add_service(new_c)
+ return new_c.when_tub_ready()
+ d.addCallback(_stopped)
+ return d
+
def add_extra_node(self, client_num, helper_furl=None,
add_to_sparent=False):
# usually this node is *not* parented to our self.sparent, so we can
u1 = CountingDataUploadable(DATA, contenthashkey=contenthashkey)
u2 = CountingDataUploadable(DATA, contenthashkey=contenthashkey)
- # tell the upload to drop the connection after about 5kB
- u1.debug_interrupt = 5000
+ # we interrupt the connection after about 5kB by shutting down
+ # the helper, then restartingit.
+ u1.interrupt_after = 5000
+ u1.interrupt_after_d = defer.Deferred()
+ u1.interrupt_after_d.addCallback(lambda res:
+ self.bounce_client(0))
# sneak into the helper and reduce its chunk size, so that our
# debug_interrupt will sever the connection on about the fifth
# could use fireEventually() to stall. Since we don't have
# the right introduction hooks, the best we can do is use a
# fixed delay. TODO: this is fragile.
- return self.stall(None, 2.0)
+ u1.interrupt_after_d.addCallback(self.stall, 2.0)
+ return u1.interrupt_after_d
d.addCallbacks(_should_not_finish, _interrupted)
def _disconnected(res):
self._eu = IEncryptedUploadable(encrypted_uploadable)
self._offset = 0
self._bytes_sent = 0
- self._cutoff = None # set by debug options
- self._cutoff_cb = None
def remote_get_size(self):
return self._eu.get_size()
def _at_correct_offset(res):
assert offset == self._offset, "%d != %d" % (offset, self._offset)
- if self._cutoff is not None and offset+length > self._cutoff:
- self._cutoff_cb()
-
return self._read_encrypted(length, hash_only=False)
d.addCallback(_at_correct_offset)
self.log("helper says we need to upload")
# we need to upload the file
reu = RemoteEncryptedUploadable(self._encuploadable)
-
- # we have unit tests which want to interrupt the upload so they
- # can exercise resumability. They indicate this by adding debug_
- # attributes to the Uploadable.
- if hasattr(self._encuploadable.original, "debug_interrupt"):
- reu._cutoff = self._encuploadable.original.debug_interrupt
- def _cutoff():
- # simulate the loss of the connection to the helper
- self.log("debug_interrupt killing connection to helper",
- level=log.WEIRD)
- upload_helper.tracker.broker.transport.loseConnection()
- return
- reu._cutoff_cb = _cutoff
d = upload_helper.callRemote("upload", reu)
# this Deferred will fire with the upload results
return d