From 8fca155a668e117024f1fb1378c52016f6042344 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Tue, 30 Jun 2009 17:00:47 -0700 Subject: [PATCH] repairer.py: wrap to 80cols. No code changes. --- src/allmydata/immutable/repairer.py | 152 ++++++++++++++++------------ 1 file changed, 88 insertions(+), 64 deletions(-) diff --git a/src/allmydata/immutable/repairer.py b/src/allmydata/immutable/repairer.py index a02e8adb..e04626cb 100644 --- a/src/allmydata/immutable/repairer.py +++ b/src/allmydata/immutable/repairer.py @@ -12,36 +12,43 @@ from allmydata.immutable import download, upload import collections class Repairer(log.PrefixingLogMixin): - """ I generate any shares which were not available and upload them to servers. - - Which servers? Well, I just use the normal upload process, so any servers that will take - shares. In fact, I even believe servers if they say that they already have shares even if - attempts to download those shares would fail because the shares are corrupted. - - My process of uploading replacement shares proceeds in a segment-wise fashion -- first I ask - servers if they can hold the new shares, and wait until enough have agreed then I download - the first segment of the file and upload the first block of each replacement share, and only - after all those blocks have been uploaded do I download the second segment of the file and - upload the second block of each replacement share to its respective server. (I do it this - way in order to minimize the amount of downloading I have to do and the amount of memory I - have to use at any one time.) - - If any of the servers to which I am uploading replacement shares fails to accept the blocks - during this process, then I just stop using that server, abandon any share-uploads that were - going to that server, and proceed to finish uploading the remaining shares to their - respective servers. At the end of my work, I produce an object which satisfies the - ICheckAndRepairResults interface (by firing the deferred that I returned from start() and - passing that check-and-repair-results object). - - Before I send any new request to a server, I always ask the "monitor" object that was passed - into my constructor whether this task has been cancelled (by invoking its - raise_if_cancelled() method). + """I generate any shares which were not available and upload them to + servers. + + Which servers? Well, I just use the normal upload process, so any servers + that will take shares. In fact, I even believe servers if they say that + they already have shares even if attempts to download those shares would + fail because the shares are corrupted. + + My process of uploading replacement shares proceeds in a segment-wise + fashion -- first I ask servers if they can hold the new shares, and wait + until enough have agreed then I download the first segment of the file + and upload the first block of each replacement share, and only after all + those blocks have been uploaded do I download the second segment of the + file and upload the second block of each replacement share to its + respective server. (I do it this way in order to minimize the amount of + downloading I have to do and the amount of memory I have to use at any + one time.) + + If any of the servers to which I am uploading replacement shares fails to + accept the blocks during this process, then I just stop using that + server, abandon any share-uploads that were going to that server, and + proceed to finish uploading the remaining shares to their respective + servers. At the end of my work, I produce an object which satisfies the + ICheckAndRepairResults interface (by firing the deferred that I returned + from start() and passing that check-and-repair-results object). + + Before I send any new request to a server, I always ask the 'monitor' + object that was passed into my constructor whether this task has been + cancelled (by invoking its raise_if_cancelled() method). """ + def __init__(self, client, verifycap, monitor): assert precondition(isinstance(verifycap, CHKFileVerifierURI)) logprefix = si_b2a(verifycap.storage_index)[:5] - log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer", prefix=logprefix) + log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer", + prefix=logprefix) self._client = client self._verifycap = verifycap @@ -51,12 +58,14 @@ class Repairer(log.PrefixingLogMixin): self.log("starting repair") duc = DownUpConnector() sb = self._client.get_storage_broker() - dl = download.CiphertextDownloader(sb, self._verifycap, target=duc, monitor=self._monitor) + dl = download.CiphertextDownloader(sb, self._verifycap, target=duc, + monitor=self._monitor) ul = upload.CHKUploader(self._client) d = defer.Deferred() - # If the upload or the download fails or is stopped, then the repair failed. + # If the upload or the download fails or is stopped, then the repair + # failed. def _errb(f): d.errback(f) return None @@ -76,63 +85,74 @@ class Repairer(log.PrefixingLogMixin): class DownUpConnector(log.PrefixingLogMixin): implements(IEncryptedUploadable, IDownloadTarget, IConsumer) - """ I act like an "encrypted uploadable" -- something that a local uploader can read - ciphertext from in order to upload the ciphertext. However, unbeknownst to the uploader, - I actually download the ciphertext from a CiphertextDownloader instance as it is needed. - - On the other hand, I act like a "download target" -- something that a local downloader can - write ciphertext to as it downloads the ciphertext. That downloader doesn't realize, of - course, that I'm just turning around and giving the ciphertext to the uploader. """ - - # The theory behind this class is nice: just satisfy two separate interfaces. The - # implementation is slightly horrible, because of "impedance mismatch" -- the downloader - # expects to be able to synchronously push data in, and the uploader expects to be able to - # read data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred. The two - # interfaces have different APIs for pausing/unpausing. The uploader requests metadata like - # size and encodingparams which the downloader provides either eventually or not at all - # (okay I just now extended the downloader to provide encodingparams). Most of this - # slightly horrible code would disappear if CiphertextDownloader just used this object as an - # IConsumer (plus maybe a couple of other methods) and if the Uploader simply expected to be - # treated as an IConsumer (plus maybe a couple of other things). + """I act like an 'encrypted uploadable' -- something that a local + uploader can read ciphertext from in order to upload the ciphertext. + However, unbeknownst to the uploader, I actually download the ciphertext + from a CiphertextDownloader instance as it is needed. + + On the other hand, I act like a 'download target' -- something that a + local downloader can write ciphertext to as it downloads the ciphertext. + That downloader doesn't realize, of course, that I'm just turning around + and giving the ciphertext to the uploader.""" + + # The theory behind this class is nice: just satisfy two separate + # interfaces. The implementation is slightly horrible, because of + # "impedance mismatch" -- the downloader expects to be able to + # synchronously push data in, and the uploader expects to be able to read + # data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred. + # The two interfaces have different APIs for pausing/unpausing. The + # uploader requests metadata like size and encodingparams which the + # downloader provides either eventually or not at all (okay I just now + # extended the downloader to provide encodingparams). Most of this + # slightly horrible code would disappear if CiphertextDownloader just + # used this object as an IConsumer (plus maybe a couple of other methods) + # and if the Uploader simply expected to be treated as an IConsumer (plus + # maybe a couple of other things). def __init__(self, buflim=2**19): - """ If we're already holding at least buflim bytes, then tell the downloader to pause - until we have less than buflim bytes.""" + """If we're already holding at least buflim bytes, then tell the + downloader to pause until we have less than buflim bytes.""" log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer") self.buflim = buflim self.bufs = collections.deque() # list of strings self.bufsiz = 0 # how many bytes total in bufs - self.next_read_ds = collections.deque() # list of deferreds which will fire with the requested ciphertext - self.next_read_lens = collections.deque() # how many bytes of ciphertext were requested by each deferred + # list of deferreds which will fire with the requested ciphertext + self.next_read_ds = collections.deque() + + # how many bytes of ciphertext were requested by each deferred + self.next_read_lens = collections.deque() self._size_osol = observer.OneShotObserverList() self._encodingparams_osol = observer.OneShotObserverList() self._storageindex_osol = observer.OneShotObserverList() self._closed_to_pusher = False - # once seg size is available, the following attribute will be created to hold it: + # once seg size is available, the following attribute will be created + # to hold it: - # self.encodingparams # (provided by the object which is pushing data into me, required - # by the object which is pulling data out of me) + # self.encodingparams # (provided by the object which is pushing data + # into me, required by the object which is pulling data out of me) # open() will create the following attribute: - # self.size # size of the whole file (provided by the object which is pushing data into - # me, required by the object which is pulling data out of me) + # self.size # size of the whole file (provided by the object which is + # pushing data into me, required by the object which is pulling data + # out of me) # set_upload_status() will create the following attribute: - # self.upload_status # XXX do we need to actually update this? Is anybody watching the - # results during a repair? + # self.upload_status # XXX do we need to actually update this? Is + # anybody watching the results during a repair? def _satisfy_reads_if_possible(self): assert bool(self.next_read_ds) == bool(self.next_read_lens) - while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0]) or self._closed_to_pusher): + while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0]) + or self._closed_to_pusher): nrd = self.next_read_ds.popleft() nrl = self.next_read_lens.popleft() - # Pick out the requested number of bytes from self.bufs, turn it into a string, and - # callback the deferred with that. + # Pick out the requested number of bytes from self.bufs, turn it + # into a string, and callback the deferred with that. res = [] ressize = 0 while ressize < nrl and self.bufs: @@ -150,8 +170,9 @@ class DownUpConnector(log.PrefixingLogMixin): self.producer.resumeProducing() nrd.callback(res) - # methods to satisfy the IConsumer and IDownloadTarget interfaces - # (From the perspective of a downloader I am an IDownloadTarget and an IConsumer.) + # methods to satisfy the IConsumer and IDownloadTarget interfaces. (From + # the perspective of a downloader I am an IDownloadTarget and an + # IConsumer.) def registerProducer(self, producer, streaming): assert streaming # We know how to handle only streaming producers. self.producer = producer # the downloader @@ -192,12 +213,14 @@ class DownUpConnector(log.PrefixingLogMixin): return self._size_osol.when_fired() def get_all_encoding_parameters(self): # We have to learn the encoding params from pusher. - if hasattr(self, 'encodingparams'): # attribute created by self.set_encodingparams() + if hasattr(self, 'encodingparams'): + # attribute created by self.set_encodingparams() return defer.succeed(self.encodingparams) else: return self._encodingparams_osol.when_fired() def read_encrypted(self, length, hash_only): - """ Returns a deferred which eventually fired with the requested ciphertext. """ + """Returns a deferred which eventually fired with the requested + ciphertext.""" precondition(length) # please don't ask to read 0 bytes d = defer.Deferred() self.next_read_ds.append(d) @@ -206,7 +229,8 @@ class DownUpConnector(log.PrefixingLogMixin): return d def get_storage_index(self): # We have to learn the storage index from pusher. - if hasattr(self, 'storageindex'): # attribute created by self.set_storageindex() + if hasattr(self, 'storageindex'): + # attribute created by self.set_storageindex() return defer.succeed(self.storageindex) else: return self._storageindex.when_fired() -- 2.45.2