]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/repairer.py
e04626cb878dcdc3b9e72e7957a131c5fc78b043
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / repairer.py
1 from zope.interface import implements
2 from twisted.internet import defer
3 from allmydata.storage.server import si_b2a
4 from allmydata.util import log, observer
5 from allmydata.util.assertutil import precondition, _assert
6 from allmydata.uri import CHKFileVerifierURI
7 from allmydata.interfaces import IEncryptedUploadable, IDownloadTarget
8 from twisted.internet.interfaces import IConsumer
9
10 from allmydata.immutable import download, upload
11
12 import collections
13
14 class Repairer(log.PrefixingLogMixin):
15     """I generate any shares which were not available and upload them to
16     servers.
17
18     Which servers? Well, I just use the normal upload process, so any servers
19     that will take shares. In fact, I even believe servers if they say that
20     they already have shares even if attempts to download those shares would
21     fail because the shares are corrupted.
22
23     My process of uploading replacement shares proceeds in a segment-wise
24     fashion -- first I ask servers if they can hold the new shares, and wait
25     until enough have agreed then I download the first segment of the file
26     and upload the first block of each replacement share, and only after all
27     those blocks have been uploaded do I download the second segment of the
28     file and upload the second block of each replacement share to its
29     respective server. (I do it this way in order to minimize the amount of
30     downloading I have to do and the amount of memory I have to use at any
31     one time.)
32
33     If any of the servers to which I am uploading replacement shares fails to
34     accept the blocks during this process, then I just stop using that
35     server, abandon any share-uploads that were going to that server, and
36     proceed to finish uploading the remaining shares to their respective
37     servers. At the end of my work, I produce an object which satisfies the
38     ICheckAndRepairResults interface (by firing the deferred that I returned
39     from start() and passing that check-and-repair-results object).
40
41     Before I send any new request to a server, I always ask the 'monitor'
42     object that was passed into my constructor whether this task has been
43     cancelled (by invoking its raise_if_cancelled() method).
44     """
45
46     def __init__(self, client, verifycap, monitor):
47         assert precondition(isinstance(verifycap, CHKFileVerifierURI))
48
49         logprefix = si_b2a(verifycap.storage_index)[:5]
50         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer",
51                                        prefix=logprefix)
52
53         self._client = client
54         self._verifycap = verifycap
55         self._monitor = monitor
56
57     def start(self):
58         self.log("starting repair")
59         duc = DownUpConnector()
60         sb = self._client.get_storage_broker()
61         dl = download.CiphertextDownloader(sb, self._verifycap, target=duc,
62                                            monitor=self._monitor)
63         ul = upload.CHKUploader(self._client)
64
65         d = defer.Deferred()
66
67         # If the upload or the download fails or is stopped, then the repair
68         # failed.
69         def _errb(f):
70             d.errback(f)
71             return None
72
73         # If the upload succeeds, then the repair has succeeded.
74         def _cb(res):
75             d.callback(res)
76         ul.start(duc).addCallbacks(_cb, _errb)
77
78         # If the download fails or is stopped, then the repair failed.
79         d2 = dl.start()
80         d2.addErrback(_errb)
81
82         # We ignore the callback from d2.  Is this right?  Ugh.
83
84         return d
85
86 class DownUpConnector(log.PrefixingLogMixin):
87     implements(IEncryptedUploadable, IDownloadTarget, IConsumer)
88     """I act like an 'encrypted uploadable' -- something that a local
89     uploader can read ciphertext from in order to upload the ciphertext.
90     However, unbeknownst to the uploader, I actually download the ciphertext
91     from a CiphertextDownloader instance as it is needed.
92
93     On the other hand, I act like a 'download target' -- something that a
94     local downloader can write ciphertext to as it downloads the ciphertext.
95     That downloader doesn't realize, of course, that I'm just turning around
96     and giving the ciphertext to the uploader."""
97
98     # The theory behind this class is nice: just satisfy two separate
99     # interfaces. The implementation is slightly horrible, because of
100     # "impedance mismatch" -- the downloader expects to be able to
101     # synchronously push data in, and the uploader expects to be able to read
102     # data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred.
103     # The two interfaces have different APIs for pausing/unpausing. The
104     # uploader requests metadata like size and encodingparams which the
105     # downloader provides either eventually or not at all (okay I just now
106     # extended the downloader to provide encodingparams). Most of this
107     # slightly horrible code would disappear if CiphertextDownloader just
108     # used this object as an IConsumer (plus maybe a couple of other methods)
109     # and if the Uploader simply expected to be treated as an IConsumer (plus
110     # maybe a couple of other things).
111
112     def __init__(self, buflim=2**19):
113         """If we're already holding at least buflim bytes, then tell the
114         downloader to pause until we have less than buflim bytes."""
115         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer")
116         self.buflim = buflim
117         self.bufs = collections.deque() # list of strings
118         self.bufsiz = 0 # how many bytes total in bufs
119
120         # list of deferreds which will fire with the requested ciphertext
121         self.next_read_ds = collections.deque()
122
123         # how many bytes of ciphertext were requested by each deferred
124         self.next_read_lens = collections.deque()
125
126         self._size_osol = observer.OneShotObserverList()
127         self._encodingparams_osol = observer.OneShotObserverList()
128         self._storageindex_osol = observer.OneShotObserverList()
129         self._closed_to_pusher = False
130
131         # once seg size is available, the following attribute will be created
132         # to hold it:
133
134         # self.encodingparams # (provided by the object which is pushing data
135         # into me, required by the object which is pulling data out of me)
136
137         # open() will create the following attribute:
138         # self.size # size of the whole file (provided by the object which is
139         # pushing data into me, required by the object which is pulling data
140         # out of me)
141
142         # set_upload_status() will create the following attribute:
143
144         # self.upload_status # XXX do we need to actually update this? Is
145         # anybody watching the results during a repair?
146
147     def _satisfy_reads_if_possible(self):
148         assert bool(self.next_read_ds) == bool(self.next_read_lens)
149         while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0])
150                                      or self._closed_to_pusher):
151             nrd = self.next_read_ds.popleft()
152             nrl = self.next_read_lens.popleft()
153
154             # Pick out the requested number of bytes from self.bufs, turn it
155             # into a string, and callback the deferred with that.
156             res = []
157             ressize = 0
158             while ressize < nrl and self.bufs:
159                 nextbuf = self.bufs.popleft()
160                 res.append(nextbuf)
161                 ressize += len(nextbuf)
162                 if ressize > nrl:
163                     extra = ressize - nrl
164                     self.bufs.appendleft(nextbuf[:-extra])
165                     res[-1] = nextbuf[:-extra]
166             assert _assert(sum(len(x) for x in res) <= nrl, [len(x) for x in res], nrl)
167             assert _assert(sum(len(x) for x in res) == nrl or self._closed_to_pusher, [len(x) for x in res], nrl)
168             self.bufsiz -= nrl
169             if self.bufsiz < self.buflim and self.producer:
170                 self.producer.resumeProducing()
171             nrd.callback(res)
172
173     # methods to satisfy the IConsumer and IDownloadTarget interfaces. (From
174     # the perspective of a downloader I am an IDownloadTarget and an
175     # IConsumer.)
176     def registerProducer(self, producer, streaming):
177         assert streaming # We know how to handle only streaming producers.
178         self.producer = producer # the downloader
179     def unregisterProducer(self):
180         self.producer = None
181     def open(self, size):
182         self.size = size
183         self._size_osol.fire(self.size)
184     def set_encodingparams(self, encodingparams):
185         self.encodingparams = encodingparams
186         self._encodingparams_osol.fire(self.encodingparams)
187     def set_storageindex(self, storageindex):
188         self.storageindex = storageindex
189         self._storageindex_osol.fire(self.storageindex)
190     def write(self, data):
191         precondition(data) # please don't write empty strings
192         self.bufs.append(data)
193         self.bufsiz += len(data)
194         self._satisfy_reads_if_possible()
195         if self.bufsiz >= self.buflim and self.producer:
196             self.producer.pauseProducing()
197     def finish(self):
198         pass
199     def close(self):
200         self._closed_to_pusher = True
201         # Any reads which haven't been satisfied by now are going to
202         # have to be satisfied with short reads.
203         self._satisfy_reads_if_possible()
204
205     # methods to satisfy the IEncryptedUploader interface
206     # (From the perspective of an uploader I am an IEncryptedUploadable.)
207     def set_upload_status(self, upload_status):
208         self.upload_status = upload_status
209     def get_size(self):
210         if hasattr(self, 'size'): # attribute created by self.open()
211             return defer.succeed(self.size)
212         else:
213             return self._size_osol.when_fired()
214     def get_all_encoding_parameters(self):
215         # We have to learn the encoding params from pusher.
216         if hasattr(self, 'encodingparams'):
217             # attribute created by self.set_encodingparams()
218             return defer.succeed(self.encodingparams)
219         else:
220             return self._encodingparams_osol.when_fired()
221     def read_encrypted(self, length, hash_only):
222         """Returns a deferred which eventually fired with the requested
223         ciphertext."""
224         precondition(length) # please don't ask to read 0 bytes
225         d = defer.Deferred()
226         self.next_read_ds.append(d)
227         self.next_read_lens.append(length)
228         self._satisfy_reads_if_possible()
229         return d
230     def get_storage_index(self):
231         # We have to learn the storage index from pusher.
232         if hasattr(self, 'storageindex'):
233             # attribute created by self.set_storageindex()
234             return defer.succeed(self.storageindex)
235         else:
236             return self._storageindex.when_fired()