From: Brian Warner Date: Thu, 17 Jan 2008 08:16:56 +0000 (-0700) Subject: offloaded: upload.py: handle forward skips, to allow resumed uploads to send less... X-Git-Tag: allmydata-tahoe-0.8.0~298 X-Git-Url: https://git.rkrishnan.org/Site/Content/Exhibitors/%22news.html/?a=commitdiff_plain;h=812383a369e0faf517652f9f7fa302f4a0fb2aa1;p=tahoe-lafs%2Ftahoe-lafs.git offloaded: upload.py: handle forward skips, to allow resumed uploads to send less than all the data. We still read all the data (to hash it, 'paranoid mode'), but we don't send it over the wire --- diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index e156b4a3..f0774301 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -638,7 +638,7 @@ class RemoteEncryptedUploadable(Referenceable): def __init__(self, encrypted_uploadable): self._eu = IEncryptedUploadable(encrypted_uploadable) self._offset = 0 - self._bytes_read = 0 + self._bytes_sent = 0 self._cutoff = None # set by debug options self._cutoff_cb = None @@ -648,14 +648,32 @@ class RemoteEncryptedUploadable(Referenceable): return self._eu.get_all_encoding_parameters() def remote_read_encrypted(self, offset, length): - # we don't yet implement seek + # we don't support seek backwards, but we allow skipping forwards + precondition(offset >= 0, offset) + precondition(length >= 0, length) + lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length), + level=log.NOISY) + precondition(offset >= self._offset, offset, self._offset) + if offset > self._offset: + # read the data from disk anyways, to build up the hash tree + skip = offset - self._offset + log.msg("remote_read_encrypted skipping ahead to %d, skip=%d" % + (self._offset, skip), level=log.UNUSUAL, parent=lp) + d = self.remote_read_encrypted(self._offset, skip) + def _ignore(strings): + size = sum([len(data) for data in strings]) + self._bytes_sent -= size + return self.remote_read_encrypted(offset, length) + d.addCallback(_ignore) + return d + assert offset == self._offset, "%d != %d" % (offset, self._offset) if self._cutoff is not None and offset+length > self._cutoff: self._cutoff_cb() d = self._eu.read_encrypted(length) def _read(strings): size = sum([len(data) for data in strings]) - self._bytes_read += size + self._bytes_sent += size self._offset += size return strings d.addCallback(_read) @@ -729,10 +747,22 @@ class AssistedUploader: self.log("helper says we need to upload") # we need to upload the file reu = RemoteEncryptedUploadable(self._encuploadable) - if False: #"debug_stash_RemoteEncryptedUploadable" in self._options: - self._options["RemoteEncryptedUploadable"] = reu - if False: #"debug_interrupt" in self._options: - reu._cutoff = self._options["debug_interrupt"] + + # we have unit tests which want to interrupt the upload so they + # can exercise resumability. They indicate this by adding debug_ + # attributes to the Uploadable. + if hasattr(self._encuploadable.original, + "debug_stash_RemoteEncryptedUploadable"): + # we communicate back to them the same way. This may look + # weird, but, well, ok, it is. However, it is better than the + # barrage of options={} dictionaries that were flying around + # before. We could also do this by setting attributes on the + # class, but that doesn't make it easy to undo when we're + # done. TODO: find a cleaner way, maybe just a small options= + # dict somewhere. + self._encuploadable.original.debug_RemoteEncryptedUploadable = reu + if hasattr(self._encuploadable.original, "debug_interrupt"): + reu._cutoff = self._encuploadable.original.debug_interrupt def _cutoff(): # simulate the loss of the connection to the helper self.log("debug_interrupt killing connection to helper",