]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
offloaded: upload.py: handle forward skips, to allow resumed uploads to send less...
authorBrian Warner <warner@lothar.com>
Thu, 17 Jan 2008 08:16:56 +0000 (01:16 -0700)
committerBrian Warner <warner@lothar.com>
Thu, 17 Jan 2008 08:16:56 +0000 (01:16 -0700)
src/allmydata/upload.py

index e156b4a3ae9ea7ac67154475c485f8a90d44fb0c..f07743015145743da0ceda5abb191c5f6d10d292 100644 (file)
@@ -638,7 +638,7 @@ class RemoteEncryptedUploadable(Referenceable):
     def __init__(self, encrypted_uploadable):
         self._eu = IEncryptedUploadable(encrypted_uploadable)
         self._offset = 0
-        self._bytes_read = 0
+        self._bytes_sent = 0
         self._cutoff = None # set by debug options
         self._cutoff_cb = None
 
@@ -648,14 +648,32 @@ class RemoteEncryptedUploadable(Referenceable):
         return self._eu.get_all_encoding_parameters()
 
     def remote_read_encrypted(self, offset, length):
-        # we don't yet implement seek
+        # we don't support seek backwards, but we allow skipping forwards
+        precondition(offset >= 0, offset)
+        precondition(length >= 0, length)
+        lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
+                     level=log.NOISY)
+        precondition(offset >= self._offset, offset, self._offset)
+        if offset > self._offset:
+            # read the data from disk anyways, to build up the hash tree
+            skip = offset - self._offset
+            log.msg("remote_read_encrypted skipping ahead to %d, skip=%d" %
+                    (self._offset, skip), level=log.UNUSUAL, parent=lp)
+            d = self.remote_read_encrypted(self._offset, skip)
+            def _ignore(strings):
+                size = sum([len(data) for data in strings])
+                self._bytes_sent -= size
+                return self.remote_read_encrypted(offset, length)
+            d.addCallback(_ignore)
+            return d
+
         assert offset == self._offset, "%d != %d" % (offset, self._offset)
         if self._cutoff is not None and offset+length > self._cutoff:
             self._cutoff_cb()
         d = self._eu.read_encrypted(length)
         def _read(strings):
             size = sum([len(data) for data in strings])
-            self._bytes_read += size
+            self._bytes_sent += size
             self._offset += size
             return strings
         d.addCallback(_read)
@@ -729,10 +747,22 @@ class AssistedUploader:
             self.log("helper says we need to upload")
             # we need to upload the file
             reu = RemoteEncryptedUploadable(self._encuploadable)
-            if False: #"debug_stash_RemoteEncryptedUploadable" in self._options:
-                self._options["RemoteEncryptedUploadable"] = reu
-            if False: #"debug_interrupt" in self._options:
-                reu._cutoff = self._options["debug_interrupt"]
+
+            # we have unit tests which want to interrupt the upload so they
+            # can exercise resumability. They indicate this by adding debug_
+            # attributes to the Uploadable.
+            if hasattr(self._encuploadable.original,
+                       "debug_stash_RemoteEncryptedUploadable"):
+                # we communicate back to them the same way. This may look
+                # weird, but, well, ok, it is. However, it is better than the
+                # barrage of options={} dictionaries that were flying around
+                # before. We could also do this by setting attributes on the
+                # class, but that doesn't make it easy to undo when we're
+                # done. TODO: find a cleaner way, maybe just a small options=
+                # dict somewhere.
+                self._encuploadable.original.debug_RemoteEncryptedUploadable = reu
+            if hasattr(self._encuploadable.original, "debug_interrupt"):
+                reu._cutoff = self._encuploadable.original.debug_interrupt
                 def _cutoff():
                     # simulate the loss of the connection to the helper
                     self.log("debug_interrupt killing connection to helper",