]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/repairer.py
d860718a1e94f9451ea92a463735916728ccc4ce
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / repairer.py
1 from zope.interface import implements
2 from twisted.internet import defer
3 from allmydata.storage.server import si_b2a
4 from allmydata.util import log, observer
5 from allmydata.util.assertutil import precondition, _assert
6 from allmydata.uri import CHKFileVerifierURI
7 from allmydata.interfaces import IEncryptedUploadable, IDownloadTarget
8 from twisted.internet.interfaces import IConsumer
9
10 from allmydata.immutable import download, upload
11
12 import collections
13
14 class Repairer(log.PrefixingLogMixin):
15     """I generate any shares which were not available and upload them to
16     servers.
17
18     Which servers? Well, I just use the normal upload process, so any servers
19     that will take shares. In fact, I even believe servers if they say that
20     they already have shares even if attempts to download those shares would
21     fail because the shares are corrupted.
22
23     My process of uploading replacement shares proceeds in a segment-wise
24     fashion -- first I ask servers if they can hold the new shares, and wait
25     until enough have agreed then I download the first segment of the file
26     and upload the first block of each replacement share, and only after all
27     those blocks have been uploaded do I download the second segment of the
28     file and upload the second block of each replacement share to its
29     respective server. (I do it this way in order to minimize the amount of
30     downloading I have to do and the amount of memory I have to use at any
31     one time.)
32
33     If any of the servers to which I am uploading replacement shares fails to
34     accept the blocks during this process, then I just stop using that
35     server, abandon any share-uploads that were going to that server, and
36     proceed to finish uploading the remaining shares to their respective
37     servers. At the end of my work, I produce an object which satisfies the
38     ICheckAndRepairResults interface (by firing the deferred that I returned
39     from start() and passing that check-and-repair-results object).
40
41     Before I send any new request to a server, I always ask the 'monitor'
42     object that was passed into my constructor whether this task has been
43     cancelled (by invoking its raise_if_cancelled() method).
44     """
45
46     def __init__(self, storage_broker, secret_holder, verifycap, monitor):
47         assert precondition(isinstance(verifycap, CHKFileVerifierURI))
48
49         logprefix = si_b2a(verifycap.storage_index)[:5]
50         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer",
51                                        prefix=logprefix)
52
53         self._storage_broker = storage_broker
54         self._secret_holder = secret_holder
55         self._verifycap = verifycap
56         self._monitor = monitor
57
58     def start(self):
59         self.log("starting repair")
60         duc = DownUpConnector()
61         dl = download.CiphertextDownloader(self._storage_broker,
62                                            self._verifycap, target=duc,
63                                            monitor=self._monitor)
64         ul = upload.CHKUploader(self._storage_broker, self._secret_holder)
65
66         d = defer.Deferred()
67
68         # If the upload or the download fails or is stopped, then the repair
69         # failed.
70         def _errb(f):
71             d.errback(f)
72             return None
73
74         # If the upload succeeds, then the repair has succeeded.
75         def _cb(res):
76             d.callback(res)
77         ul.start(duc).addCallbacks(_cb, _errb)
78
79         # If the download fails or is stopped, then the repair failed.
80         d2 = dl.start()
81         d2.addErrback(_errb)
82
83         # We ignore the callback from d2.  Is this right?  Ugh.
84
85         return d
86
87 class DownUpConnector(log.PrefixingLogMixin):
88     implements(IEncryptedUploadable, IDownloadTarget, IConsumer)
89     """I act like an 'encrypted uploadable' -- something that a local
90     uploader can read ciphertext from in order to upload the ciphertext.
91     However, unbeknownst to the uploader, I actually download the ciphertext
92     from a CiphertextDownloader instance as it is needed.
93
94     On the other hand, I act like a 'download target' -- something that a
95     local downloader can write ciphertext to as it downloads the ciphertext.
96     That downloader doesn't realize, of course, that I'm just turning around
97     and giving the ciphertext to the uploader."""
98
99     # The theory behind this class is nice: just satisfy two separate
100     # interfaces. The implementation is slightly horrible, because of
101     # "impedance mismatch" -- the downloader expects to be able to
102     # synchronously push data in, and the uploader expects to be able to read
103     # data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred.
104     # The two interfaces have different APIs for pausing/unpausing. The
105     # uploader requests metadata like size and encodingparams which the
106     # downloader provides either eventually or not at all (okay I just now
107     # extended the downloader to provide encodingparams). Most of this
108     # slightly horrible code would disappear if CiphertextDownloader just
109     # used this object as an IConsumer (plus maybe a couple of other methods)
110     # and if the Uploader simply expected to be treated as an IConsumer (plus
111     # maybe a couple of other things).
112
113     def __init__(self, buflim=2**19):
114         """If we're already holding at least buflim bytes, then tell the
115         downloader to pause until we have less than buflim bytes."""
116         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer")
117         self.buflim = buflim
118         self.bufs = collections.deque() # list of strings
119         self.bufsiz = 0 # how many bytes total in bufs
120
121         # list of deferreds which will fire with the requested ciphertext
122         self.next_read_ds = collections.deque()
123
124         # how many bytes of ciphertext were requested by each deferred
125         self.next_read_lens = collections.deque()
126
127         self._size_osol = observer.OneShotObserverList()
128         self._encodingparams_osol = observer.OneShotObserverList()
129         self._storageindex_osol = observer.OneShotObserverList()
130         self._closed_to_pusher = False
131
132         # once seg size is available, the following attribute will be created
133         # to hold it:
134
135         # self.encodingparams # (provided by the object which is pushing data
136         # into me, required by the object which is pulling data out of me)
137
138         # open() will create the following attribute:
139         # self.size # size of the whole file (provided by the object which is
140         # pushing data into me, required by the object which is pulling data
141         # out of me)
142
143         # set_upload_status() will create the following attribute:
144
145         # self.upload_status # XXX do we need to actually update this? Is
146         # anybody watching the results during a repair?
147
148     def _satisfy_reads_if_possible(self):
149         assert bool(self.next_read_ds) == bool(self.next_read_lens)
150         while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0])
151                                      or self._closed_to_pusher):
152             nrd = self.next_read_ds.popleft()
153             nrl = self.next_read_lens.popleft()
154
155             # Pick out the requested number of bytes from self.bufs, turn it
156             # into a string, and callback the deferred with that.
157             res = []
158             ressize = 0
159             while ressize < nrl and self.bufs:
160                 nextbuf = self.bufs.popleft()
161                 res.append(nextbuf)
162                 ressize += len(nextbuf)
163                 if ressize > nrl:
164                     extra = ressize - nrl
165                     self.bufs.appendleft(nextbuf[:-extra])
166                     res[-1] = nextbuf[:-extra]
167             assert _assert(sum(len(x) for x in res) <= nrl, [len(x) for x in res], nrl)
168             assert _assert(sum(len(x) for x in res) == nrl or self._closed_to_pusher, [len(x) for x in res], nrl)
169             self.bufsiz -= nrl
170             if self.bufsiz < self.buflim and self.producer:
171                 self.producer.resumeProducing()
172             nrd.callback(res)
173
174     # methods to satisfy the IConsumer and IDownloadTarget interfaces. (From
175     # the perspective of a downloader I am an IDownloadTarget and an
176     # IConsumer.)
177     def registerProducer(self, producer, streaming):
178         assert streaming # We know how to handle only streaming producers.
179         self.producer = producer # the downloader
180     def unregisterProducer(self):
181         self.producer = None
182     def open(self, size):
183         self.size = size
184         self._size_osol.fire(self.size)
185     def set_encodingparams(self, encodingparams):
186         self.encodingparams = encodingparams
187         self._encodingparams_osol.fire(self.encodingparams)
188     def set_storageindex(self, storageindex):
189         self.storageindex = storageindex
190         self._storageindex_osol.fire(self.storageindex)
191     def write(self, data):
192         precondition(data) # please don't write empty strings
193         self.bufs.append(data)
194         self.bufsiz += len(data)
195         self._satisfy_reads_if_possible()
196         if self.bufsiz >= self.buflim and self.producer:
197             self.producer.pauseProducing()
198     def finish(self):
199         pass
200     def close(self):
201         self._closed_to_pusher = True
202         # Any reads which haven't been satisfied by now are going to
203         # have to be satisfied with short reads.
204         self._satisfy_reads_if_possible()
205
206     # methods to satisfy the IEncryptedUploader interface
207     # (From the perspective of an uploader I am an IEncryptedUploadable.)
208     def set_upload_status(self, upload_status):
209         self.upload_status = upload_status
210     def get_size(self):
211         if hasattr(self, 'size'): # attribute created by self.open()
212             return defer.succeed(self.size)
213         else:
214             return self._size_osol.when_fired()
215     def get_all_encoding_parameters(self):
216         # We have to learn the encoding params from pusher.
217         if hasattr(self, 'encodingparams'):
218             # attribute created by self.set_encodingparams()
219             return defer.succeed(self.encodingparams)
220         else:
221             return self._encodingparams_osol.when_fired()
222     def read_encrypted(self, length, hash_only):
223         """Returns a deferred which eventually fired with the requested
224         ciphertext."""
225         precondition(length) # please don't ask to read 0 bytes
226         d = defer.Deferred()
227         self.next_read_ds.append(d)
228         self.next_read_lens.append(length)
229         self._satisfy_reads_if_possible()
230         return d
231     def get_storage_index(self):
232         # We have to learn the storage index from pusher.
233         if hasattr(self, 'storageindex'):
234             # attribute created by self.set_storageindex()
235             return defer.succeed(self.storageindex)
236         else:
237             return self._storageindex.when_fired()