]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
repairer.py: wrap to 80cols. No code changes.
authorBrian Warner <warner@lothar.com>
Wed, 1 Jul 2009 00:00:47 +0000 (17:00 -0700)
committerBrian Warner <warner@lothar.com>
Wed, 1 Jul 2009 00:00:47 +0000 (17:00 -0700)
src/allmydata/immutable/repairer.py

index a02e8adb95eabc75bef646e0a71897949c8b8ec4..e04626cb878dcdc3b9e72e7957a131c5fc78b043 100644 (file)
@@ -12,36 +12,43 @@ from allmydata.immutable import download, upload
 import collections
 
 class Repairer(log.PrefixingLogMixin):
-    """ I generate any shares which were not available and upload them to servers.
-
-    Which servers?  Well, I just use the normal upload process, so any servers that will take
-    shares.  In fact, I even believe servers if they say that they already have shares even if
-    attempts to download those shares would fail because the shares are corrupted.
-
-    My process of uploading replacement shares proceeds in a segment-wise fashion -- first I ask
-    servers if they can hold the new shares, and wait until enough have agreed then I download
-    the first segment of the file and upload the first block of each replacement share, and only
-    after all those blocks have been uploaded do I download the second segment of the file and
-    upload the second block of each replacement share to its respective server.  (I do it this
-    way in order to minimize the amount of downloading I have to do and the amount of memory I
-    have to use at any one time.)
-
-    If any of the servers to which I am uploading replacement shares fails to accept the blocks
-    during this process, then I just stop using that server, abandon any share-uploads that were
-    going to that server, and proceed to finish uploading the remaining shares to their
-    respective servers.  At the end of my work, I produce an object which satisfies the
-    ICheckAndRepairResults interface (by firing the deferred that I returned from start() and
-    passing that check-and-repair-results object).
-
-    Before I send any new request to a server, I always ask the "monitor" object that was passed
-    into my constructor whether this task has been cancelled (by invoking its
-    raise_if_cancelled() method).
+    """I generate any shares which were not available and upload them to
+    servers.
+
+    Which servers? Well, I just use the normal upload process, so any servers
+    that will take shares. In fact, I even believe servers if they say that
+    they already have shares even if attempts to download those shares would
+    fail because the shares are corrupted.
+
+    My process of uploading replacement shares proceeds in a segment-wise
+    fashion -- first I ask servers if they can hold the new shares, and wait
+    until enough have agreed then I download the first segment of the file
+    and upload the first block of each replacement share, and only after all
+    those blocks have been uploaded do I download the second segment of the
+    file and upload the second block of each replacement share to its
+    respective server. (I do it this way in order to minimize the amount of
+    downloading I have to do and the amount of memory I have to use at any
+    one time.)
+
+    If any of the servers to which I am uploading replacement shares fails to
+    accept the blocks during this process, then I just stop using that
+    server, abandon any share-uploads that were going to that server, and
+    proceed to finish uploading the remaining shares to their respective
+    servers. At the end of my work, I produce an object which satisfies the
+    ICheckAndRepairResults interface (by firing the deferred that I returned
+    from start() and passing that check-and-repair-results object).
+
+    Before I send any new request to a server, I always ask the 'monitor'
+    object that was passed into my constructor whether this task has been
+    cancelled (by invoking its raise_if_cancelled() method).
     """
+
     def __init__(self, client, verifycap, monitor):
         assert precondition(isinstance(verifycap, CHKFileVerifierURI))
 
         logprefix = si_b2a(verifycap.storage_index)[:5]
-        log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer", prefix=logprefix)
+        log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer",
+                                       prefix=logprefix)
 
         self._client = client
         self._verifycap = verifycap
@@ -51,12 +58,14 @@ class Repairer(log.PrefixingLogMixin):
         self.log("starting repair")
         duc = DownUpConnector()
         sb = self._client.get_storage_broker()
-        dl = download.CiphertextDownloader(sb, self._verifycap, target=duc, monitor=self._monitor)
+        dl = download.CiphertextDownloader(sb, self._verifycap, target=duc,
+                                           monitor=self._monitor)
         ul = upload.CHKUploader(self._client)
 
         d = defer.Deferred()
 
-        # If the upload or the download fails or is stopped, then the repair failed.
+        # If the upload or the download fails or is stopped, then the repair
+        # failed.
         def _errb(f):
             d.errback(f)
             return None
@@ -76,63 +85,74 @@ class Repairer(log.PrefixingLogMixin):
 
 class DownUpConnector(log.PrefixingLogMixin):
     implements(IEncryptedUploadable, IDownloadTarget, IConsumer)
-    """ I act like an "encrypted uploadable" -- something that a local uploader can read
-    ciphertext from in order to upload the ciphertext.  However, unbeknownst to the uploader,
-    I actually download the ciphertext from a CiphertextDownloader instance as it is needed.
-
-    On the other hand, I act like a "download target" -- something that a local downloader can
-    write ciphertext to as it downloads the ciphertext.  That downloader doesn't realize, of
-    course, that I'm just turning around and giving the ciphertext to the uploader. """
-
-    # The theory behind this class is nice: just satisfy two separate interfaces.  The
-    # implementation is slightly horrible, because of "impedance mismatch" -- the downloader
-    # expects to be able to synchronously push data in, and the uploader expects to be able to
-    # read data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred.  The two
-    # interfaces have different APIs for pausing/unpausing.  The uploader requests metadata like
-    # size and encodingparams which the downloader provides either eventually or not at all
-    # (okay I just now extended the downloader to provide encodingparams).  Most of this
-    # slightly horrible code would disappear if CiphertextDownloader just used this object as an
-    # IConsumer (plus maybe a couple of other methods) and if the Uploader simply expected to be
-    # treated as an IConsumer (plus maybe a couple of other things).
+    """I act like an 'encrypted uploadable' -- something that a local
+    uploader can read ciphertext from in order to upload the ciphertext.
+    However, unbeknownst to the uploader, I actually download the ciphertext
+    from a CiphertextDownloader instance as it is needed.
+
+    On the other hand, I act like a 'download target' -- something that a
+    local downloader can write ciphertext to as it downloads the ciphertext.
+    That downloader doesn't realize, of course, that I'm just turning around
+    and giving the ciphertext to the uploader."""
+
+    # The theory behind this class is nice: just satisfy two separate
+    # interfaces. The implementation is slightly horrible, because of
+    # "impedance mismatch" -- the downloader expects to be able to
+    # synchronously push data in, and the uploader expects to be able to read
+    # data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred.
+    # The two interfaces have different APIs for pausing/unpausing. The
+    # uploader requests metadata like size and encodingparams which the
+    # downloader provides either eventually or not at all (okay I just now
+    # extended the downloader to provide encodingparams). Most of this
+    # slightly horrible code would disappear if CiphertextDownloader just
+    # used this object as an IConsumer (plus maybe a couple of other methods)
+    # and if the Uploader simply expected to be treated as an IConsumer (plus
+    # maybe a couple of other things).
 
     def __init__(self, buflim=2**19):
-        """ If we're already holding at least buflim bytes, then tell the downloader to pause
-        until we have less than buflim bytes."""
+        """If we're already holding at least buflim bytes, then tell the
+        downloader to pause until we have less than buflim bytes."""
         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer")
         self.buflim = buflim
         self.bufs = collections.deque() # list of strings
         self.bufsiz = 0 # how many bytes total in bufs
 
-        self.next_read_ds = collections.deque() # list of deferreds which will fire with the requested ciphertext
-        self.next_read_lens = collections.deque() # how many bytes of ciphertext were requested by each deferred
+        # list of deferreds which will fire with the requested ciphertext
+        self.next_read_ds = collections.deque()
+
+        # how many bytes of ciphertext were requested by each deferred
+        self.next_read_lens = collections.deque()
 
         self._size_osol = observer.OneShotObserverList()
         self._encodingparams_osol = observer.OneShotObserverList()
         self._storageindex_osol = observer.OneShotObserverList()
         self._closed_to_pusher = False
 
-        # once seg size is available, the following attribute will be created to hold it:
+        # once seg size is available, the following attribute will be created
+        # to hold it:
 
-        # self.encodingparams # (provided by the object which is pushing data into me, required
-        # by the object which is pulling data out of me)
+        # self.encodingparams # (provided by the object which is pushing data
+        # into me, required by the object which is pulling data out of me)
 
         # open() will create the following attribute:
-        # self.size # size of the whole file (provided by the object which is pushing data into
-        # me, required by the object which is pulling data out of me)
+        # self.size # size of the whole file (provided by the object which is
+        # pushing data into me, required by the object which is pulling data
+        # out of me)
 
         # set_upload_status() will create the following attribute:
 
-        # self.upload_status # XXX do we need to actually update this?  Is anybody watching the
-        # results during a repair?
+        # self.upload_status # XXX do we need to actually update this? Is
+        # anybody watching the results during a repair?
 
     def _satisfy_reads_if_possible(self):
         assert bool(self.next_read_ds) == bool(self.next_read_lens)
-        while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0]) or self._closed_to_pusher):
+        while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0])
+                                     or self._closed_to_pusher):
             nrd = self.next_read_ds.popleft()
             nrl = self.next_read_lens.popleft()
 
-            # Pick out the requested number of bytes from self.bufs, turn it into a string, and
-            # callback the deferred with that.
+            # Pick out the requested number of bytes from self.bufs, turn it
+            # into a string, and callback the deferred with that.
             res = []
             ressize = 0
             while ressize < nrl and self.bufs:
@@ -150,8 +170,9 @@ class DownUpConnector(log.PrefixingLogMixin):
                 self.producer.resumeProducing()
             nrd.callback(res)
 
-    # methods to satisfy the IConsumer and IDownloadTarget interfaces
-    # (From the perspective of a downloader I am an IDownloadTarget and an IConsumer.)
+    # methods to satisfy the IConsumer and IDownloadTarget interfaces. (From
+    # the perspective of a downloader I am an IDownloadTarget and an
+    # IConsumer.)
     def registerProducer(self, producer, streaming):
         assert streaming # We know how to handle only streaming producers.
         self.producer = producer # the downloader
@@ -192,12 +213,14 @@ class DownUpConnector(log.PrefixingLogMixin):
             return self._size_osol.when_fired()
     def get_all_encoding_parameters(self):
         # We have to learn the encoding params from pusher.
-        if hasattr(self, 'encodingparams'): # attribute created by self.set_encodingparams()
+        if hasattr(self, 'encodingparams'):
+            # attribute created by self.set_encodingparams()
             return defer.succeed(self.encodingparams)
         else:
             return self._encodingparams_osol.when_fired()
     def read_encrypted(self, length, hash_only):
-        """ Returns a deferred which eventually fired with the requested ciphertext. """
+        """Returns a deferred which eventually fired with the requested
+        ciphertext."""
         precondition(length) # please don't ask to read 0 bytes
         d = defer.Deferred()
         self.next_read_ds.append(d)
@@ -206,7 +229,8 @@ class DownUpConnector(log.PrefixingLogMixin):
         return d
     def get_storage_index(self):
         # We have to learn the storage index from pusher.
-        if hasattr(self, 'storageindex'): # attribute created by self.set_storageindex()
+        if hasattr(self, 'storageindex'):
+            # attribute created by self.set_storageindex()
             return defer.succeed(self.storageindex)
         else:
             return self._storageindex.when_fired()