1 from zope.interface import implements
2 from twisted.internet import defer
3 from allmydata.storage.server import si_b2a
4 from allmydata.util import log, observer
5 from allmydata.util.assertutil import precondition, _assert
6 from allmydata.uri import CHKFileVerifierURI
7 from allmydata.interfaces import IEncryptedUploadable, IDownloadTarget
8 from twisted.internet.interfaces import IConsumer
10 from allmydata.immutable import download, upload
14 class Repairer(log.PrefixingLogMixin):
15 """I generate any shares which were not available and upload them to
18 Which servers? Well, I just use the normal upload process, so any servers
19 that will take shares. In fact, I even believe servers if they say that
20 they already have shares even if attempts to download those shares would
21 fail because the shares are corrupted.
23 My process of uploading replacement shares proceeds in a segment-wise
24 fashion -- first I ask servers if they can hold the new shares, and wait
25 until enough have agreed then I download the first segment of the file
26 and upload the first block of each replacement share, and only after all
27 those blocks have been uploaded do I download the second segment of the
28 file and upload the second block of each replacement share to its
29 respective server. (I do it this way in order to minimize the amount of
30 downloading I have to do and the amount of memory I have to use at any
33 If any of the servers to which I am uploading replacement shares fails to
34 accept the blocks during this process, then I just stop using that
35 server, abandon any share-uploads that were going to that server, and
36 proceed to finish uploading the remaining shares to their respective
37 servers. At the end of my work, I produce an object which satisfies the
38 ICheckAndRepairResults interface (by firing the deferred that I returned
39 from start() and passing that check-and-repair-results object).
41 Before I send any new request to a server, I always ask the 'monitor'
42 object that was passed into my constructor whether this task has been
43 cancelled (by invoking its raise_if_cancelled() method).
46 def __init__(self, client, verifycap, monitor):
47 assert precondition(isinstance(verifycap, CHKFileVerifierURI))
49 logprefix = si_b2a(verifycap.storage_index)[:5]
50 log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer",
54 self._verifycap = verifycap
55 self._monitor = monitor
58 self.log("starting repair")
59 duc = DownUpConnector()
60 sb = self._client.get_storage_broker()
61 dl = download.CiphertextDownloader(sb, self._verifycap, target=duc,
62 monitor=self._monitor)
63 ul = upload.CHKUploader(self._client)
67 # If the upload or the download fails or is stopped, then the repair
73 # If the upload succeeds, then the repair has succeeded.
76 ul.start(duc).addCallbacks(_cb, _errb)
78 # If the download fails or is stopped, then the repair failed.
82 # We ignore the callback from d2. Is this right? Ugh.
86 class DownUpConnector(log.PrefixingLogMixin):
87 implements(IEncryptedUploadable, IDownloadTarget, IConsumer)
88 """I act like an 'encrypted uploadable' -- something that a local
89 uploader can read ciphertext from in order to upload the ciphertext.
90 However, unbeknownst to the uploader, I actually download the ciphertext
91 from a CiphertextDownloader instance as it is needed.
93 On the other hand, I act like a 'download target' -- something that a
94 local downloader can write ciphertext to as it downloads the ciphertext.
95 That downloader doesn't realize, of course, that I'm just turning around
96 and giving the ciphertext to the uploader."""
98 # The theory behind this class is nice: just satisfy two separate
99 # interfaces. The implementation is slightly horrible, because of
100 # "impedance mismatch" -- the downloader expects to be able to
101 # synchronously push data in, and the uploader expects to be able to read
102 # data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred.
103 # The two interfaces have different APIs for pausing/unpausing. The
104 # uploader requests metadata like size and encodingparams which the
105 # downloader provides either eventually or not at all (okay I just now
106 # extended the downloader to provide encodingparams). Most of this
107 # slightly horrible code would disappear if CiphertextDownloader just
108 # used this object as an IConsumer (plus maybe a couple of other methods)
109 # and if the Uploader simply expected to be treated as an IConsumer (plus
110 # maybe a couple of other things).
112 def __init__(self, buflim=2**19):
113 """If we're already holding at least buflim bytes, then tell the
114 downloader to pause until we have less than buflim bytes."""
115 log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer")
117 self.bufs = collections.deque() # list of strings
118 self.bufsiz = 0 # how many bytes total in bufs
120 # list of deferreds which will fire with the requested ciphertext
121 self.next_read_ds = collections.deque()
123 # how many bytes of ciphertext were requested by each deferred
124 self.next_read_lens = collections.deque()
126 self._size_osol = observer.OneShotObserverList()
127 self._encodingparams_osol = observer.OneShotObserverList()
128 self._storageindex_osol = observer.OneShotObserverList()
129 self._closed_to_pusher = False
131 # once seg size is available, the following attribute will be created
134 # self.encodingparams # (provided by the object which is pushing data
135 # into me, required by the object which is pulling data out of me)
137 # open() will create the following attribute:
138 # self.size # size of the whole file (provided by the object which is
139 # pushing data into me, required by the object which is pulling data
142 # set_upload_status() will create the following attribute:
144 # self.upload_status # XXX do we need to actually update this? Is
145 # anybody watching the results during a repair?
147 def _satisfy_reads_if_possible(self):
148 assert bool(self.next_read_ds) == bool(self.next_read_lens)
149 while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0])
150 or self._closed_to_pusher):
151 nrd = self.next_read_ds.popleft()
152 nrl = self.next_read_lens.popleft()
154 # Pick out the requested number of bytes from self.bufs, turn it
155 # into a string, and callback the deferred with that.
158 while ressize < nrl and self.bufs:
159 nextbuf = self.bufs.popleft()
161 ressize += len(nextbuf)
163 extra = ressize - nrl
164 self.bufs.appendleft(nextbuf[:-extra])
165 res[-1] = nextbuf[:-extra]
166 assert _assert(sum(len(x) for x in res) <= nrl, [len(x) for x in res], nrl)
167 assert _assert(sum(len(x) for x in res) == nrl or self._closed_to_pusher, [len(x) for x in res], nrl)
169 if self.bufsiz < self.buflim and self.producer:
170 self.producer.resumeProducing()
173 # methods to satisfy the IConsumer and IDownloadTarget interfaces. (From
174 # the perspective of a downloader I am an IDownloadTarget and an
176 def registerProducer(self, producer, streaming):
177 assert streaming # We know how to handle only streaming producers.
178 self.producer = producer # the downloader
179 def unregisterProducer(self):
181 def open(self, size):
183 self._size_osol.fire(self.size)
184 def set_encodingparams(self, encodingparams):
185 self.encodingparams = encodingparams
186 self._encodingparams_osol.fire(self.encodingparams)
187 def set_storageindex(self, storageindex):
188 self.storageindex = storageindex
189 self._storageindex_osol.fire(self.storageindex)
190 def write(self, data):
191 precondition(data) # please don't write empty strings
192 self.bufs.append(data)
193 self.bufsiz += len(data)
194 self._satisfy_reads_if_possible()
195 if self.bufsiz >= self.buflim and self.producer:
196 self.producer.pauseProducing()
200 self._closed_to_pusher = True
201 # Any reads which haven't been satisfied by now are going to
202 # have to be satisfied with short reads.
203 self._satisfy_reads_if_possible()
205 # methods to satisfy the IEncryptedUploader interface
206 # (From the perspective of an uploader I am an IEncryptedUploadable.)
207 def set_upload_status(self, upload_status):
208 self.upload_status = upload_status
210 if hasattr(self, 'size'): # attribute created by self.open()
211 return defer.succeed(self.size)
213 return self._size_osol.when_fired()
214 def get_all_encoding_parameters(self):
215 # We have to learn the encoding params from pusher.
216 if hasattr(self, 'encodingparams'):
217 # attribute created by self.set_encodingparams()
218 return defer.succeed(self.encodingparams)
220 return self._encodingparams_osol.when_fired()
221 def read_encrypted(self, length, hash_only):
222 """Returns a deferred which eventually fired with the requested
224 precondition(length) # please don't ask to read 0 bytes
226 self.next_read_ds.append(d)
227 self.next_read_lens.append(length)
228 self._satisfy_reads_if_possible()
230 def get_storage_index(self):
231 # We have to learn the storage index from pusher.
232 if hasattr(self, 'storageindex'):
233 # attribute created by self.set_storageindex()
234 return defer.succeed(self.storageindex)
236 return self._storageindex.when_fired()