1 from zope.interface import implements
2 from twisted.internet import defer
3 from allmydata.storage.server import si_b2a
4 from allmydata.util import log, observer
5 from allmydata.util.assertutil import precondition, _assert
6 from allmydata.uri import CHKFileVerifierURI
7 from allmydata.interfaces import IEncryptedUploadable, IDownloadTarget
8 from twisted.internet.interfaces import IConsumer
10 from allmydata.immutable import download, upload
14 class Repairer(log.PrefixingLogMixin):
15 """I generate any shares which were not available and upload them to
18 Which servers? Well, I just use the normal upload process, so any servers
19 that will take shares. In fact, I even believe servers if they say that
20 they already have shares even if attempts to download those shares would
21 fail because the shares are corrupted.
23 My process of uploading replacement shares proceeds in a segment-wise
24 fashion -- first I ask servers if they can hold the new shares, and wait
25 until enough have agreed then I download the first segment of the file
26 and upload the first block of each replacement share, and only after all
27 those blocks have been uploaded do I download the second segment of the
28 file and upload the second block of each replacement share to its
29 respective server. (I do it this way in order to minimize the amount of
30 downloading I have to do and the amount of memory I have to use at any
33 If any of the servers to which I am uploading replacement shares fails to
34 accept the blocks during this process, then I just stop using that
35 server, abandon any share-uploads that were going to that server, and
36 proceed to finish uploading the remaining shares to their respective
37 servers. At the end of my work, I produce an object which satisfies the
38 ICheckAndRepairResults interface (by firing the deferred that I returned
39 from start() and passing that check-and-repair-results object).
41 Before I send any new request to a server, I always ask the 'monitor'
42 object that was passed into my constructor whether this task has been
43 cancelled (by invoking its raise_if_cancelled() method).
46 def __init__(self, storage_broker, secret_holder, verifycap, monitor):
47 assert precondition(isinstance(verifycap, CHKFileVerifierURI))
49 logprefix = si_b2a(verifycap.storage_index)[:5]
50 log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer",
53 self._storage_broker = storage_broker
54 self._secret_holder = secret_holder
55 self._verifycap = verifycap
56 self._monitor = monitor
59 self.log("starting repair")
60 duc = DownUpConnector()
61 dl = download.CiphertextDownloader(self._storage_broker,
62 self._verifycap, target=duc,
63 monitor=self._monitor)
64 ul = upload.CHKUploader(self._storage_broker, self._secret_holder)
68 # If the upload or the download fails or is stopped, then the repair
74 # If the upload succeeds, then the repair has succeeded.
77 ul.start(duc).addCallbacks(_cb, _errb)
79 # If the download fails or is stopped, then the repair failed.
83 # We ignore the callback from d2. Is this right? Ugh.
87 class DownUpConnector(log.PrefixingLogMixin):
88 implements(IEncryptedUploadable, IDownloadTarget, IConsumer)
89 """I act like an 'encrypted uploadable' -- something that a local
90 uploader can read ciphertext from in order to upload the ciphertext.
91 However, unbeknownst to the uploader, I actually download the ciphertext
92 from a CiphertextDownloader instance as it is needed.
94 On the other hand, I act like a 'download target' -- something that a
95 local downloader can write ciphertext to as it downloads the ciphertext.
96 That downloader doesn't realize, of course, that I'm just turning around
97 and giving the ciphertext to the uploader."""
99 # The theory behind this class is nice: just satisfy two separate
100 # interfaces. The implementation is slightly horrible, because of
101 # "impedance mismatch" -- the downloader expects to be able to
102 # synchronously push data in, and the uploader expects to be able to read
103 # data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred.
104 # The two interfaces have different APIs for pausing/unpausing. The
105 # uploader requests metadata like size and encodingparams which the
106 # downloader provides either eventually or not at all (okay I just now
107 # extended the downloader to provide encodingparams). Most of this
108 # slightly horrible code would disappear if CiphertextDownloader just
109 # used this object as an IConsumer (plus maybe a couple of other methods)
110 # and if the Uploader simply expected to be treated as an IConsumer (plus
111 # maybe a couple of other things).
113 def __init__(self, buflim=2**19):
114 """If we're already holding at least buflim bytes, then tell the
115 downloader to pause until we have less than buflim bytes."""
116 log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer")
118 self.bufs = collections.deque() # list of strings
119 self.bufsiz = 0 # how many bytes total in bufs
121 # list of deferreds which will fire with the requested ciphertext
122 self.next_read_ds = collections.deque()
124 # how many bytes of ciphertext were requested by each deferred
125 self.next_read_lens = collections.deque()
127 self._size_osol = observer.OneShotObserverList()
128 self._encodingparams_osol = observer.OneShotObserverList()
129 self._storageindex_osol = observer.OneShotObserverList()
130 self._closed_to_pusher = False
132 # once seg size is available, the following attribute will be created
135 # self.encodingparams # (provided by the object which is pushing data
136 # into me, required by the object which is pulling data out of me)
138 # open() will create the following attribute:
139 # self.size # size of the whole file (provided by the object which is
140 # pushing data into me, required by the object which is pulling data
143 # set_upload_status() will create the following attribute:
145 # self.upload_status # XXX do we need to actually update this? Is
146 # anybody watching the results during a repair?
148 def _satisfy_reads_if_possible(self):
149 assert bool(self.next_read_ds) == bool(self.next_read_lens)
150 while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0])
151 or self._closed_to_pusher):
152 nrd = self.next_read_ds.popleft()
153 nrl = self.next_read_lens.popleft()
155 # Pick out the requested number of bytes from self.bufs, turn it
156 # into a string, and callback the deferred with that.
159 while ressize < nrl and self.bufs:
160 nextbuf = self.bufs.popleft()
162 ressize += len(nextbuf)
164 extra = ressize - nrl
165 self.bufs.appendleft(nextbuf[:-extra])
166 res[-1] = nextbuf[:-extra]
167 assert _assert(sum(len(x) for x in res) <= nrl, [len(x) for x in res], nrl)
168 assert _assert(sum(len(x) for x in res) == nrl or self._closed_to_pusher, [len(x) for x in res], nrl)
170 if self.bufsiz < self.buflim and self.producer:
171 self.producer.resumeProducing()
174 # methods to satisfy the IConsumer and IDownloadTarget interfaces. (From
175 # the perspective of a downloader I am an IDownloadTarget and an
177 def registerProducer(self, producer, streaming):
178 assert streaming # We know how to handle only streaming producers.
179 self.producer = producer # the downloader
180 def unregisterProducer(self):
182 def open(self, size):
184 self._size_osol.fire(self.size)
185 def set_encodingparams(self, encodingparams):
186 self.encodingparams = encodingparams
187 self._encodingparams_osol.fire(self.encodingparams)
188 def set_storageindex(self, storageindex):
189 self.storageindex = storageindex
190 self._storageindex_osol.fire(self.storageindex)
191 def write(self, data):
192 precondition(data) # please don't write empty strings
193 self.bufs.append(data)
194 self.bufsiz += len(data)
195 self._satisfy_reads_if_possible()
196 if self.bufsiz >= self.buflim and self.producer:
197 self.producer.pauseProducing()
201 self._closed_to_pusher = True
202 # Any reads which haven't been satisfied by now are going to
203 # have to be satisfied with short reads.
204 self._satisfy_reads_if_possible()
206 # methods to satisfy the IEncryptedUploader interface
207 # (From the perspective of an uploader I am an IEncryptedUploadable.)
208 def set_upload_status(self, upload_status):
209 self.upload_status = upload_status
211 if hasattr(self, 'size'): # attribute created by self.open()
212 return defer.succeed(self.size)
214 return self._size_osol.when_fired()
215 def get_all_encoding_parameters(self):
216 # We have to learn the encoding params from pusher.
217 if hasattr(self, 'encodingparams'):
218 # attribute created by self.set_encodingparams()
219 return defer.succeed(self.encodingparams)
221 return self._encodingparams_osol.when_fired()
222 def read_encrypted(self, length, hash_only):
223 """Returns a deferred which eventually fired with the requested
225 precondition(length) # please don't ask to read 0 bytes
227 self.next_read_ds.append(d)
228 self.next_read_lens.append(length)
229 self._satisfy_reads_if_possible()
231 def get_storage_index(self):
232 # We have to learn the storage index from pusher.
233 if hasattr(self, 'storageindex'):
234 # attribute created by self.set_storageindex()
235 return defer.succeed(self.storageindex)
237 return self._storageindex.when_fired()