]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/downloader/share.py
d51270212aa7c8f9e9cc562c391b164a30dc8c53
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / downloader / share.py
1
2 import struct
3 import time
4 now = time.time
5
6 from twisted.python.failure import Failure
7 from foolscap.api import eventually
8 from allmydata.util import base32, log, hashutil, mathutil
9 from allmydata.util.spans import Spans, DataSpans
10 from allmydata.interfaces import HASH_SIZE
11 from allmydata.hashtree import IncompleteHashTree, BadHashError, \
12      NotEnoughHashesError
13
14 from allmydata.immutable.layout import make_write_bucket_proxy
15 from allmydata.util.observer import EventStreamObserver
16 from common import COMPLETE, CORRUPT, DEAD, BADSEGNUM
17
18
19 class LayoutInvalid(Exception):
20     pass
21 class DataUnavailable(Exception):
22     pass
23
24 class Share:
25     """I represent a single instance of a single share (e.g. I reference the
26     shnum2 for share SI=abcde on server xy12t, not the one on server ab45q).
27     I am associated with a CommonShare that remembers data that is held in
28     common among e.g. SI=abcde/shnum2 across all servers. I am also
29     associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all
30     servers).
31     """
32     # this is a specific implementation of IShare for tahoe's native storage
33     # servers. A different backend would use a different class.
34
35     def __init__(self, rref, server, verifycap, commonshare, node,
36                  download_status, shnum, dyhb_rtt, logparent):
37         self._rref = rref
38         self._server = server
39         self._node = node # holds share_hash_tree and UEB
40         self.actual_segment_size = node.segment_size # might still be None
41         # XXX change node.guessed_segment_size to
42         # node.best_guess_segment_size(), which should give us the real ones
43         # if known, else its guess.
44         self._guess_offsets(verifycap, node.guessed_segment_size)
45         self.actual_offsets = None
46         self._UEB_length = None
47         self._commonshare = commonshare # holds block_hash_tree
48         self._download_status = download_status
49         self._storage_index = verifycap.storage_index
50         self._si_prefix = base32.b2a(verifycap.storage_index)[:8]
51         self._shnum = shnum
52         self._dyhb_rtt = dyhb_rtt
53         # self._alive becomes False upon fatal corruption or server error
54         self._alive = True
55         self._loop_scheduled = False
56         self._lp = log.msg(format="%(share)s created", share=repr(self),
57                            level=log.NOISY, parent=logparent, umid="P7hv2w")
58
59         self._pending = Spans() # request sent but no response received yet
60         self._received = DataSpans() # ACK response received, with data
61         self._unavailable = Spans() # NAK response received, no data
62
63         # any given byte of the share can be in one of four states:
64         #  in: _wanted, _requested, _received
65         #      FALSE    FALSE       FALSE : don't care about it at all
66         #      TRUE     FALSE       FALSE : want it, haven't yet asked for it
67         #      TRUE     TRUE        FALSE : request is in-flight
68         #                                   or didn't get it
69         #      FALSE    TRUE        TRUE  : got it, haven't used it yet
70         #      FALSE    TRUE        FALSE : got it and used it
71         #      FALSE    FALSE       FALSE : block consumed, ready to ask again
72         #
73         # when we request data and get a NAK, we leave it in _requested
74         # to remind ourself to not ask for it again. We don't explicitly
75         # remove it from anything (maybe this should change).
76         #
77         # We retain the hashtrees in the Node, so we leave those spans in
78         # _requested (and never ask for them again, as long as the Node is
79         # alive). But we don't retain data blocks (too big), so when we
80         # consume a data block, we remove it from _requested, so a later
81         # download can re-fetch it.
82
83         self._requested_blocks = [] # (segnum, set(observer2..))
84         v = server.get_version()
85         ver = v["http://allmydata.org/tahoe/protocols/storage/v1"]
86         self._overrun_ok = ver["tolerates-immutable-read-overrun"]
87         # If _overrun_ok and we guess the offsets correctly, we can get
88         # everything in one RTT. If _overrun_ok and we guess wrong, we might
89         # need two RTT (but we could get lucky and do it in one). If overrun
90         # is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version,
91         # 2=offset table, 3=UEB_length and everything else (hashes, block),
92         # 4=UEB.
93
94         self.had_corruption = False # for unit tests
95
96     def __repr__(self):
97         return "Share(sh%d-on-%s)" % (self._shnum, self._server.get_name())
98
99     def is_alive(self):
100         # XXX: reconsider. If the share sees a single error, should it remain
101         # dead for all time? Or should the next segment try again? This DEAD
102         # state is stored elsewhere too (SegmentFetcher per-share states?)
103         # and needs to be consistent. We clear _alive in self._fail(), which
104         # is called upon a network error, or layout failure, or hash failure
105         # in the UEB or a hash tree. We do not _fail() for a hash failure in
106         # a block, but of course we still tell our callers about
107         # state=CORRUPT so they'll find a different share.
108         return self._alive
109
110     def _guess_offsets(self, verifycap, guessed_segment_size):
111         self.guessed_segment_size = guessed_segment_size
112         size = verifycap.size
113         k = verifycap.needed_shares
114         N = verifycap.total_shares
115         r = self._node._calculate_sizes(guessed_segment_size)
116         # num_segments, block_size/tail_block_size
117         # guessed_segment_size/tail_segment_size/tail_segment_padded
118         share_size = mathutil.div_ceil(size, k)
119         # share_size is the amount of block data that will be put into each
120         # share, summed over all segments. It does not include hashes, the
121         # UEB, or other overhead.
122
123         # use the upload-side code to get this as accurate as possible
124         ht = IncompleteHashTree(N)
125         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
126         wbp = make_write_bucket_proxy(None, None, share_size, r["block_size"],
127                                       r["num_segments"], num_share_hashes, 0)
128         self._fieldsize = wbp.fieldsize
129         self._fieldstruct = wbp.fieldstruct
130         self.guessed_offsets = wbp._offsets
131
132     # called by our client, the SegmentFetcher
133     def get_block(self, segnum):
134         """Add a block number to the list of requests. This will eventually
135         result in a fetch of the data necessary to validate the block, then
136         the block itself. The fetch order is generally
137         first-come-first-served, but requests may be answered out-of-order if
138         data becomes available sooner.
139
140         I return an EventStreamObserver, which has two uses. The first is to
141         call o.subscribe(), which gives me a place to send state changes and
142         eventually the data block. The second is o.cancel(), which removes
143         the request (if it is still active).
144
145         I will distribute the following events through my EventStreamObserver:
146          - state=OVERDUE: ?? I believe I should have had an answer by now.
147                           You may want to ask another share instead.
148          - state=BADSEGNUM: the segnum you asked for is too large. I must
149                             fetch a valid UEB before I can determine this,
150                             so the notification is asynchronous
151          - state=COMPLETE, block=data: here is a valid block
152          - state=CORRUPT: this share contains corrupted data
153          - state=DEAD, f=Failure: the server reported an error, this share
154                                   is unusable
155         """
156         log.msg("%s.get_block(%d)" % (repr(self), segnum),
157                 level=log.NOISY, parent=self._lp, umid="RTo9MQ")
158         assert segnum >= 0
159         o = EventStreamObserver()
160         o.set_canceler(self, "_cancel_block_request")
161         for i,(segnum0,observers) in enumerate(self._requested_blocks):
162             if segnum0 == segnum:
163                 observers.add(o)
164                 break
165         else:
166             self._requested_blocks.append( (segnum, set([o])) )
167         self.schedule_loop()
168         return o
169
170     def _cancel_block_request(self, o):
171         new_requests = []
172         for e in self._requested_blocks:
173             (segnum0, observers) = e
174             observers.discard(o)
175             if observers:
176                 new_requests.append(e)
177         self._requested_blocks = new_requests
178
179     # internal methods
180     def _active_segnum_and_observers(self):
181         if self._requested_blocks:
182             # we only retrieve information for one segment at a time, to
183             # minimize alacrity (first come, first served)
184             return self._requested_blocks[0]
185         return None, []
186
187     def schedule_loop(self):
188         if self._loop_scheduled:
189             return
190         self._loop_scheduled = True
191         eventually(self.loop)
192
193     def loop(self):
194         self._loop_scheduled = False
195         if not self._alive:
196             return
197         try:
198             # if any exceptions occur here, kill the download
199             log.msg("%s.loop, reqs=[%s], pending=%s, received=%s,"
200                     " unavailable=%s" %
201                     (repr(self),
202                      ",".join([str(req[0]) for req in self._requested_blocks]),
203                      self._pending.dump(), self._received.dump(),
204                      self._unavailable.dump() ),
205                     level=log.NOISY, parent=self._lp, umid="BaL1zw")
206             self._do_loop()
207             # all exception cases call self._fail(), which clears self._alive
208         except (BadHashError, NotEnoughHashesError, LayoutInvalid), e:
209             # Abandon this share. We do this if we see corruption in the
210             # offset table, the UEB, or a hash tree. We don't abandon the
211             # whole share if we see corruption in a data block (we abandon
212             # just the one block, and still try to get data from other blocks
213             # on the same server). In theory, we could get good data from a
214             # share with a corrupt UEB (by first getting the UEB from some
215             # other share), or corrupt hash trees, but the logic to decide
216             # when this is safe is non-trivial. So for now, give up at the
217             # first sign of corruption.
218             #
219             # _satisfy_*() code which detects corruption should first call
220             # self._signal_corruption(), and then raise the exception.
221             log.msg(format="corruption detected in %(share)s",
222                     share=repr(self),
223                     level=log.UNUSUAL, parent=self._lp, umid="gWspVw")
224             self._fail(Failure(e), log.UNUSUAL)
225         except DataUnavailable, e:
226             # Abandon this share.
227             log.msg(format="need data that will never be available"
228                     " from %s: pending=%s, received=%s, unavailable=%s" %
229                     (repr(self),
230                      self._pending.dump(), self._received.dump(),
231                      self._unavailable.dump() ),
232                     level=log.UNUSUAL, parent=self._lp, umid="F7yJnQ")
233             self._fail(Failure(e), log.UNUSUAL)
234         except BaseException:
235             self._fail(Failure())
236             raise
237         log.msg("%s.loop done, reqs=[%s], pending=%s, received=%s,"
238                 " unavailable=%s" %
239                 (repr(self),
240                  ",".join([str(req[0]) for req in self._requested_blocks]),
241                  self._pending.dump(), self._received.dump(),
242                  self._unavailable.dump() ),
243                 level=log.NOISY, parent=self._lp, umid="9lRaRA")
244
245     def _do_loop(self):
246         # we are (eventually) called after all state transitions:
247         #  new segments added to self._requested_blocks
248         #  new data received from servers (responses to our read() calls)
249         #  impatience timer fires (server appears slow)
250
251         # First, consume all of the information that we currently have, for
252         # all the segments people currently want.
253         while self._get_satisfaction():
254             pass
255
256         # When we get no satisfaction (from the data we've received so far),
257         # we determine what data we desire (to satisfy more requests). The
258         # number of segments is finite, so I can't get no satisfaction
259         # forever.
260         wanted, needed = self._desire()
261
262         # Finally, send out requests for whatever we need (desire minus
263         # have). You can't always get what you want, but if you try
264         # sometimes, you just might find, you get what you need.
265         self._send_requests(wanted + needed)
266
267         # and sometimes you can't even get what you need
268         disappointment = needed & self._unavailable
269         if disappointment.len():
270             self.had_corruption = True
271             raise DataUnavailable("need %s but will never get it" %
272                                   disappointment.dump())
273
274     def _get_satisfaction(self):
275         # return True if we retired a data block, and should therefore be
276         # called again. Return False if we don't retire a data block (even if
277         # we do retire some other data, like hash chains).
278
279         if self.actual_offsets is None:
280             if not self._satisfy_offsets():
281                 # can't even look at anything without the offset table
282                 return False
283
284         if not self._node.have_UEB:
285             if not self._satisfy_UEB():
286                 # can't check any hashes without the UEB
287                 return False
288             # the call to _satisfy_UEB() will immediately set the
289             # authoritative num_segments in all our CommonShares. If we
290             # guessed wrong, we might stil be working on a bogus segnum
291             # (beyond the real range). We catch this and signal BADSEGNUM
292             # before invoking any further code that touches hashtrees.
293         self.actual_segment_size = self._node.segment_size # might be updated
294         assert self.actual_segment_size is not None
295
296         # knowing the UEB means knowing num_segments
297         assert self._node.num_segments is not None
298
299         segnum, observers = self._active_segnum_and_observers()
300         # if segnum is None, we don't really need to do anything (we have no
301         # outstanding readers right now), but we'll fill in the bits that
302         # aren't tied to any particular segment.
303
304         if segnum is not None and segnum >= self._node.num_segments:
305             for o in observers:
306                 o.notify(state=BADSEGNUM)
307             self._requested_blocks.pop(0)
308             return True
309
310         if self._node.share_hash_tree.needed_hashes(self._shnum):
311             if not self._satisfy_share_hash_tree():
312                 # can't check block_hash_tree without a root
313                 return False
314
315         if self._commonshare.need_block_hash_root():
316             block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum)
317             self._commonshare.set_block_hash_root(block_hash_root)
318
319         if segnum is None:
320             return False # we don't want any particular segment right now
321
322         # block_hash_tree
323         needed_hashes = self._commonshare.get_needed_block_hashes(segnum)
324         if needed_hashes:
325             if not self._satisfy_block_hash_tree(needed_hashes):
326                 # can't check block without block_hash_tree
327                 return False
328
329         # ciphertext_hash_tree
330         needed_hashes = self._node.get_needed_ciphertext_hashes(segnum)
331         if needed_hashes:
332             if not self._satisfy_ciphertext_hash_tree(needed_hashes):
333                 # can't check decoded blocks without ciphertext_hash_tree
334                 return False
335
336         # data blocks
337         return self._satisfy_data_block(segnum, observers)
338
339     def _satisfy_offsets(self):
340         version_s = self._received.get(0, 4)
341         if version_s is None:
342             return False
343         (version,) = struct.unpack(">L", version_s)
344         if version == 1:
345             table_start = 0x0c
346             self._fieldsize = 0x4
347             self._fieldstruct = "L"
348         elif version == 2:
349             table_start = 0x14
350             self._fieldsize = 0x8
351             self._fieldstruct = "Q"
352         else:
353             self.had_corruption = True
354             raise LayoutInvalid("unknown version %d (I understand 1 and 2)"
355                                 % version)
356         offset_table_size = 6 * self._fieldsize
357         table_s = self._received.pop(table_start, offset_table_size)
358         if table_s is None:
359             return False
360         fields = struct.unpack(">"+6*self._fieldstruct, table_s)
361         offsets = {}
362         for i,field in enumerate(['data',
363                                   'plaintext_hash_tree', # UNUSED
364                                   'crypttext_hash_tree',
365                                   'block_hashes',
366                                   'share_hashes',
367                                   'uri_extension',
368                                   ] ):
369             offsets[field] = fields[i]
370         self.actual_offsets = offsets
371         log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields),
372                 level=log.NOISY, parent=self._lp, umid="jedQcw")
373         self._received.remove(0, 4) # don't need this anymore
374
375         # validate the offsets a bit
376         share_hashes_size = offsets["uri_extension"] - offsets["share_hashes"]
377         if share_hashes_size < 0 or share_hashes_size % (2+HASH_SIZE) != 0:
378             # the share hash chain is stored as (hashnum,hash) pairs
379             self.had_corruption = True
380             raise LayoutInvalid("share hashes malformed -- should be a"
381                                 " multiple of %d bytes -- not %d" %
382                                 (2+HASH_SIZE, share_hashes_size))
383         block_hashes_size = offsets["share_hashes"] - offsets["block_hashes"]
384         if block_hashes_size < 0 or block_hashes_size % (HASH_SIZE) != 0:
385             # the block hash tree is stored as a list of hashes
386             self.had_corruption = True
387             raise LayoutInvalid("block hashes malformed -- should be a"
388                                 " multiple of %d bytes -- not %d" %
389                                 (HASH_SIZE, block_hashes_size))
390         # we only look at 'crypttext_hash_tree' if the UEB says we're
391         # actually using it. Same with 'plaintext_hash_tree'. This gives us
392         # some wiggle room: a place to stash data for later extensions.
393
394         return True
395
396     def _satisfy_UEB(self):
397         o = self.actual_offsets
398         fsize = self._fieldsize
399         UEB_length_s = self._received.get(o["uri_extension"], fsize)
400         if not UEB_length_s:
401             return False
402         (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
403         UEB_s = self._received.pop(o["uri_extension"]+fsize, UEB_length)
404         if not UEB_s:
405             return False
406         self._received.remove(o["uri_extension"], fsize)
407         try:
408             self._node.validate_and_store_UEB(UEB_s)
409             return True
410         except (LayoutInvalid, BadHashError), e:
411             # TODO: if this UEB was bad, we'll keep trying to validate it
412             # over and over again. Only log.err on the first one, or better
413             # yet skip all but the first
414             f = Failure(e)
415             self._signal_corruption(f, o["uri_extension"], fsize+UEB_length)
416             self.had_corruption = True
417             raise
418
419     def _satisfy_share_hash_tree(self):
420         # the share hash chain is stored as (hashnum,hash) tuples, so you
421         # can't fetch just the pieces you need, because you don't know
422         # exactly where they are. So fetch everything, and parse the results
423         # later.
424         o = self.actual_offsets
425         hashlen = o["uri_extension"] - o["share_hashes"]
426         assert hashlen % (2+HASH_SIZE) == 0
427         hashdata = self._received.get(o["share_hashes"], hashlen)
428         if not hashdata:
429             return False
430         share_hashes = {}
431         for i in range(0, hashlen, 2+HASH_SIZE):
432             (hashnum,) = struct.unpack(">H", hashdata[i:i+2])
433             hashvalue = hashdata[i+2:i+2+HASH_SIZE]
434             share_hashes[hashnum] = hashvalue
435         # TODO: if they give us an empty set of hashes,
436         # process_share_hashes() won't fail. We must ensure that this
437         # situation doesn't allow unverified shares through. Manual testing
438         # shows that set_block_hash_root() throws an assert because an
439         # internal node is None instead of an actual hash, but we want
440         # something better. It's probably best to add a method to
441         # IncompleteHashTree which takes a leaf number and raises an
442         # exception unless that leaf is present and fully validated.
443         try:
444             self._node.process_share_hashes(share_hashes)
445             # adds to self._node.share_hash_tree
446         except (BadHashError, NotEnoughHashesError), e:
447             f = Failure(e)
448             self._signal_corruption(f, o["share_hashes"], hashlen)
449             self.had_corruption = True
450             raise
451         self._received.remove(o["share_hashes"], hashlen)
452         return True
453
454     def _signal_corruption(self, f, start, offset):
455         # there was corruption somewhere in the given range
456         reason = "corruption in share[%d-%d): %s" % (start, start+offset,
457                                                      str(f.value))
458         self._rref.callRemoteOnly("advise_corrupt_share", reason)
459
460     def _satisfy_block_hash_tree(self, needed_hashes):
461         o_bh = self.actual_offsets["block_hashes"]
462         block_hashes = {}
463         for hashnum in needed_hashes:
464             hashdata = self._received.get(o_bh+hashnum*HASH_SIZE, HASH_SIZE)
465             if hashdata:
466                 block_hashes[hashnum] = hashdata
467             else:
468                 return False # missing some hashes
469         # note that we don't submit any hashes to the block_hash_tree until
470         # we've gotten them all, because the hash tree will throw an
471         # exception if we only give it a partial set (which it therefore
472         # cannot validate)
473         try:
474             self._commonshare.process_block_hashes(block_hashes)
475         except (BadHashError, NotEnoughHashesError), e:
476             f = Failure(e)
477             hashnums = ",".join([str(n) for n in sorted(block_hashes.keys())])
478             log.msg(format="hash failure in block_hashes=(%(hashnums)s),"
479                     " from %(share)s",
480                     hashnums=hashnums, shnum=self._shnum, share=repr(self),
481                     failure=f, level=log.WEIRD, parent=self._lp, umid="yNyFdA")
482             hsize = max(0, max(needed_hashes)) * HASH_SIZE
483             self._signal_corruption(f, o_bh, hsize)
484             self.had_corruption = True
485             raise
486         for hashnum in needed_hashes:
487             self._received.remove(o_bh+hashnum*HASH_SIZE, HASH_SIZE)
488         return True
489
490     def _satisfy_ciphertext_hash_tree(self, needed_hashes):
491         start = self.actual_offsets["crypttext_hash_tree"]
492         hashes = {}
493         for hashnum in needed_hashes:
494             hashdata = self._received.get(start+hashnum*HASH_SIZE, HASH_SIZE)
495             if hashdata:
496                 hashes[hashnum] = hashdata
497             else:
498                 return False # missing some hashes
499         # we don't submit any hashes to the ciphertext_hash_tree until we've
500         # gotten them all
501         try:
502             self._node.process_ciphertext_hashes(hashes)
503         except (BadHashError, NotEnoughHashesError), e:
504             f = Failure(e)
505             hashnums = ",".join([str(n) for n in sorted(hashes.keys())])
506             log.msg(format="hash failure in ciphertext_hashes=(%(hashnums)s),"
507                     " from %(share)s",
508                     hashnums=hashnums, share=repr(self), failure=f,
509                     level=log.WEIRD, parent=self._lp, umid="iZI0TA")
510             hsize = max(0, max(needed_hashes))*HASH_SIZE
511             self._signal_corruption(f, start, hsize)
512             self.had_corruption = True
513             raise
514         for hashnum in needed_hashes:
515             self._received.remove(start+hashnum*HASH_SIZE, HASH_SIZE)
516         return True
517
518     def _satisfy_data_block(self, segnum, observers):
519         tail = (segnum == self._node.num_segments-1)
520         datastart = self.actual_offsets["data"]
521         blockstart = datastart + segnum * self._node.block_size
522         blocklen = self._node.block_size
523         if tail:
524             blocklen = self._node.tail_block_size
525
526         block = self._received.pop(blockstart, blocklen)
527         if not block:
528             log.msg("no data for block %s (want [%d:+%d])" % (repr(self),
529                                                               blockstart, blocklen),
530                     level=log.NOISY, parent=self._lp, umid="aK0RFw")
531             return False
532         log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]",
533                 share=repr(self), start=blockstart, length=blocklen,
534                 level=log.NOISY, parent=self._lp, umid="uTDNZg")
535         # this block is being retired, either as COMPLETE or CORRUPT, since
536         # no further data reads will help
537         assert self._requested_blocks[0][0] == segnum
538         try:
539             self._commonshare.check_block(segnum, block)
540             # hurrah, we have a valid block. Deliver it.
541             for o in observers:
542                 # goes to SegmentFetcher._block_request_activity
543                 o.notify(state=COMPLETE, block=block)
544             # now clear our received data, to dodge the #1170 spans.py
545             # complexity bug
546             self._received = DataSpans()
547         except (BadHashError, NotEnoughHashesError), e:
548             # rats, we have a corrupt block. Notify our clients that they
549             # need to look elsewhere, and advise the server. Unlike
550             # corruption in other parts of the share, this doesn't cause us
551             # to abandon the whole share.
552             f = Failure(e)
553             log.msg(format="hash failure in block %(segnum)d, from %(share)s",
554                     segnum=segnum, share=repr(self), failure=f,
555                     level=log.WEIRD, parent=self._lp, umid="mZjkqA")
556             for o in observers:
557                 o.notify(state=CORRUPT)
558             self._signal_corruption(f, blockstart, blocklen)
559             self.had_corruption = True
560         # in either case, we've retired this block
561         self._requested_blocks.pop(0)
562         # popping the request keeps us from turning around and wanting the
563         # block again right away
564         return True # got satisfaction
565
566     def _desire(self):
567         segnum, observers = self._active_segnum_and_observers() # maybe None
568
569         # 'want_it' is for data we merely want: we know that we don't really
570         # need it. This includes speculative reads, like the first 1KB of the
571         # share (for the offset table) and the first 2KB of the UEB.
572         #
573         # 'need_it' is for data that, if we have the real offset table, we'll
574         # need. If we are only guessing at the offset table, it's merely
575         # wanted. (The share is abandoned if we can't get data that we really
576         # need).
577         #
578         # 'gotta_gotta_have_it' is for data that we absolutely need,
579         # independent of whether we're still guessing about the offset table:
580         # the version number and the offset table itself.
581         #
582         # Mr. Popeil, I'm in trouble, need your assistance on the double. Aww..
583
584         desire = Spans(), Spans(), Spans()
585         (want_it, need_it, gotta_gotta_have_it) = desire
586
587         self.actual_segment_size = self._node.segment_size # might be updated
588         o = self.actual_offsets or self.guessed_offsets
589         segsize = self.actual_segment_size or self.guessed_segment_size
590         r = self._node._calculate_sizes(segsize)
591
592         if not self.actual_offsets:
593             # all _desire functions add bits to the three desire[] spans
594             self._desire_offsets(desire)
595
596         # we can use guessed offsets as long as this server tolerates
597         # overrun. Otherwise, we must wait for the offsets to arrive before
598         # we try to read anything else.
599         if self.actual_offsets or self._overrun_ok:
600             if not self._node.have_UEB:
601                 self._desire_UEB(desire, o)
602             self._desire_share_hashes(desire, o)
603             if segnum is not None:
604                 # They might be asking for a segment number that is beyond
605                 # what we guess the file contains, but _desire_block_hashes
606                 # and _desire_data will tolerate that.
607                 self._desire_block_hashes(desire, o, segnum)
608                 self._desire_data(desire, o, r, segnum, segsize)
609
610         log.msg("end _desire: want_it=%s need_it=%s gotta=%s"
611                 % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()),
612                 level=log.NOISY, parent=self._lp, umid="IG7CgA")
613         if self.actual_offsets:
614             return (want_it, need_it+gotta_gotta_have_it)
615         else:
616             return (want_it+need_it, gotta_gotta_have_it)
617
618     def _desire_offsets(self, desire):
619         (want_it, need_it, gotta_gotta_have_it) = desire
620         if self._overrun_ok:
621             # easy! this includes version number, sizes, and offsets
622             want_it.add(0, 1024)
623             return
624
625         # v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44).
626         # To be conservative, only request the data that we know lives there,
627         # even if that means more roundtrips.
628
629         gotta_gotta_have_it.add(0, 4)  # version number, always safe
630         version_s = self._received.get(0, 4)
631         if not version_s:
632             return
633         (version,) = struct.unpack(">L", version_s)
634         # The code in _satisfy_offsets will have checked this version
635         # already. There is no code path to get this far with version>2.
636         assert 1 <= version <= 2, "can't get here, version=%d" % version
637         if version == 1:
638             table_start = 0x0c
639             fieldsize = 0x4
640         elif version == 2:
641             table_start = 0x14
642             fieldsize = 0x8
643         offset_table_size = 6 * fieldsize
644         gotta_gotta_have_it.add(table_start, offset_table_size)
645
646     def _desire_UEB(self, desire, o):
647         (want_it, need_it, gotta_gotta_have_it) = desire
648
649         # UEB data is stored as (length,data).
650         if self._overrun_ok:
651             # We can pre-fetch 2kb, which should probably cover it. If it
652             # turns out to be larger, we'll come back here later with a known
653             # length and fetch the rest.
654             want_it.add(o["uri_extension"], 2048)
655             # now, while that is probably enough to fetch the whole UEB, it
656             # might not be, so we need to do the next few steps as well. In
657             # most cases, the following steps will not actually add anything
658             # to need_it
659
660         need_it.add(o["uri_extension"], self._fieldsize)
661         # only use a length if we're sure it's correct, otherwise we'll
662         # probably fetch a huge number
663         if not self.actual_offsets:
664             return
665         UEB_length_s = self._received.get(o["uri_extension"], self._fieldsize)
666         if UEB_length_s:
667             (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
668             # we know the length, so make sure we grab everything
669             need_it.add(o["uri_extension"]+self._fieldsize, UEB_length)
670
671     def _desire_share_hashes(self, desire, o):
672         (want_it, need_it, gotta_gotta_have_it) = desire
673
674         if self._node.share_hash_tree.needed_hashes(self._shnum):
675             hashlen = o["uri_extension"] - o["share_hashes"]
676             need_it.add(o["share_hashes"], hashlen)
677
678     def _desire_block_hashes(self, desire, o, segnum):
679         (want_it, need_it, gotta_gotta_have_it) = desire
680
681         # block hash chain
682         for hashnum in self._commonshare.get_desired_block_hashes(segnum):
683             need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
684
685         # ciphertext hash chain
686         for hashnum in self._node.get_desired_ciphertext_hashes(segnum):
687             need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE)
688
689     def _desire_data(self, desire, o, r, segnum, segsize):
690         if segnum > r["num_segments"]:
691             # they're asking for a segment that's beyond what we think is the
692             # end of the file. We won't get here if we've already learned the
693             # real UEB: _get_satisfaction() will notice the out-of-bounds and
694             # terminate the loop. So we must still be guessing, which means
695             # that they might be correct in asking for such a large segnum.
696             # But if they're right, then our segsize/segnum guess is
697             # certainly wrong, which means we don't know what data blocks to
698             # ask for yet. So don't bother adding anything. When the UEB
699             # comes back and we learn the correct segsize/segnums, we'll
700             # either reject the request or have enough information to proceed
701             # normally. This costs one roundtrip.
702             log.msg("_desire_data: segnum(%d) looks wrong (numsegs=%d)"
703                     % (segnum, r["num_segments"]),
704                     level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
705             return
706         (want_it, need_it, gotta_gotta_have_it) = desire
707         tail = (segnum == r["num_segments"]-1)
708         datastart = o["data"]
709         blockstart = datastart + segnum * r["block_size"]
710         blocklen = r["block_size"]
711         if tail:
712             blocklen = r["tail_block_size"]
713         need_it.add(blockstart, blocklen)
714
715     def _send_requests(self, desired):
716         ask = desired - self._pending - self._received.get_spans()
717         log.msg("%s._send_requests, desired=%s, pending=%s, ask=%s" %
718                 (repr(self), desired.dump(), self._pending.dump(), ask.dump()),
719                 level=log.NOISY, parent=self._lp, umid="E94CVA")
720         # XXX At one time, this code distinguished between data blocks and
721         # hashes, and made sure to send (small) requests for hashes before
722         # sending (big) requests for blocks. The idea was to make sure that
723         # all hashes arrive before the blocks, so the blocks can be consumed
724         # and released in a single turn. I removed this for simplicity.
725         # Reconsider the removal: maybe bring it back.
726         ds = self._download_status
727
728         for (start, length) in ask:
729             # TODO: quantize to reasonably-large blocks
730             self._pending.add(start, length)
731             lp = log.msg(format="%(share)s._send_request"
732                          " [%(start)d:+%(length)d]",
733                          share=repr(self),
734                          start=start, length=length,
735                          level=log.NOISY, parent=self._lp, umid="sgVAyA")
736             block_ev = ds.add_block_request(self._server, self._shnum,
737                                             start, length, now())
738             d = self._send_request(start, length)
739             d.addCallback(self._got_data, start, length, block_ev, lp)
740             d.addErrback(self._got_error, start, length, block_ev, lp)
741             d.addCallback(self._trigger_loop)
742             d.addErrback(lambda f:
743                          log.err(format="unhandled error during send_request",
744                                  failure=f, parent=self._lp,
745                                  level=log.WEIRD, umid="qZu0wg"))
746
747     def _send_request(self, start, length):
748         return self._rref.callRemote("read", start, length)
749
750     def _got_data(self, data, start, length, block_ev, lp):
751         block_ev.finished(len(data), now())
752         if not self._alive:
753             return
754         log.msg(format="%(share)s._got_data [%(start)d:+%(length)d] -> %(datalen)d",
755                 share=repr(self), start=start, length=length, datalen=len(data),
756                 level=log.NOISY, parent=lp, umid="5Qn6VQ")
757         self._pending.remove(start, length)
758         self._received.add(start, data)
759
760         # if we ask for [a:c], and we get back [a:b] (b<c), that means we're
761         # never going to get [b:c]. If we really need that data, this block
762         # will never complete. The easiest way to get into this situation is
763         # to hit a share with a corrupted offset table, or one that's somehow
764         # been truncated. On the other hand, when overrun_ok is true, we ask
765         # for data beyond the end of the share all the time (it saves some
766         # RTT when we don't know the length of the share ahead of time). So
767         # not every asked-for-but-not-received byte is fatal.
768         if len(data) < length:
769             self._unavailable.add(start+len(data), length-len(data))
770
771         # XXX if table corruption causes our sections to overlap, then one
772         # consumer (i.e. block hash tree) will pop/remove the data that
773         # another consumer (i.e. block data) mistakenly thinks it needs. It
774         # won't ask for that data again, because the span is in
775         # self._requested. But that span won't be in self._unavailable
776         # because we got it back from the server. TODO: handle this properly
777         # (raise DataUnavailable). Then add sanity-checking
778         # no-overlaps-allowed tests to the offset-table unpacking code to
779         # catch this earlier. XXX
780
781         # accumulate a wanted/needed span (not as self._x, but passed into
782         # desire* functions). manage a pending/in-flight list. when the
783         # requests are sent out, empty/discard the wanted/needed span and
784         # populate/augment the pending list. when the responses come back,
785         # augment either received+data or unavailable.
786
787         # if a corrupt offset table results in double-usage, we'll send
788         # double requests.
789
790         # the wanted/needed span is only "wanted" for the first pass. Once
791         # the offset table arrives, it's all "needed".
792
793     def _got_error(self, f, start, length, block_ev, lp):
794         block_ev.error(now())
795         log.msg(format="error requesting %(start)d+%(length)d"
796                 " from %(server)s for si %(si)s",
797                 start=start, length=length,
798                 server=self._server.get_name(), si=self._si_prefix,
799                 failure=f, parent=lp, level=log.UNUSUAL, umid="BZgAJw")
800         # retire our observers, assuming we won't be able to make any
801         # further progress
802         self._fail(f, log.UNUSUAL)
803
804     def _trigger_loop(self, res):
805         if self._alive:
806             self.schedule_loop()
807         return res
808
809     def _fail(self, f, level=log.WEIRD):
810         log.msg(format="abandoning %(share)s",
811                 share=repr(self), failure=f,
812                 level=level, parent=self._lp, umid="JKM2Og")
813         self._alive = False
814         for (segnum, observers) in self._requested_blocks:
815             for o in observers:
816                 o.notify(state=DEAD, f=f)
817
818
819 class CommonShare:
820     # TODO: defer creation of the hashtree until somebody uses us. There will
821     # be a lot of unused shares, and we shouldn't spend the memory on a large
822     # hashtree unless necessary.
823     """I hold data that is common across all instances of a single share,
824     like sh2 on both servers A and B. This is just the block hash tree.
825     """
826     def __init__(self, best_numsegs, si_prefix, shnum, logparent):
827         self.si_prefix = si_prefix
828         self.shnum = shnum
829
830         # in the beginning, before we have the real UEB, we can only guess at
831         # the number of segments. But we want to ask for block hashes early.
832         # So if we're asked for which block hashes are needed before we know
833         # numsegs for sure, we return a guess.
834         self._block_hash_tree = IncompleteHashTree(best_numsegs)
835         self._block_hash_tree_is_authoritative = False
836         self._block_hash_tree_leaves = best_numsegs
837         self._logparent = logparent
838
839     def __repr__(self):
840         return "CommonShare(%s-sh%d)" % (self.si_prefix, self.shnum)
841
842     def set_authoritative_num_segments(self, numsegs):
843         if self._block_hash_tree_leaves != numsegs:
844             self._block_hash_tree = IncompleteHashTree(numsegs)
845             self._block_hash_tree_leaves = numsegs
846         self._block_hash_tree_is_authoritative = True
847
848     def need_block_hash_root(self):
849         return bool(not self._block_hash_tree[0])
850
851     def set_block_hash_root(self, roothash):
852         assert self._block_hash_tree_is_authoritative
853         self._block_hash_tree.set_hashes({0: roothash})
854
855     def get_desired_block_hashes(self, segnum):
856         if segnum < self._block_hash_tree_leaves:
857             return self._block_hash_tree.needed_hashes(segnum,
858                                                        include_leaf=True)
859
860         # the segnum might be out-of-bounds. Originally it was due to a race
861         # between the receipt of the UEB on one share (from which we learn
862         # the correct number of segments, update all hash trees to the right
863         # size, and queue a BADSEGNUM to the SegmentFetcher) and the delivery
864         # of a new Share to the SegmentFetcher while that BADSEGNUM was
865         # queued (which sends out requests to the stale segnum, now larger
866         # than the hash tree). I fixed that (by making SegmentFetcher.loop
867         # check for a bad segnum at the start of each pass, instead of using
868         # the queued BADSEGNUM or a flag it sets), but just in case this
869         # still happens, I'm leaving the < in place. If it gets hit, there's
870         # a potential lost-progress problem, but I'm pretty sure that it will
871         # get cleared up on the following turn.
872         return []
873
874     def get_needed_block_hashes(self, segnum):
875         assert self._block_hash_tree_is_authoritative
876         # XXX: include_leaf=True needs thought: how did the old downloader do
877         # it? I think it grabbed *all* block hashes and set them all at once.
878         # Since we want to fetch less data, we either need to fetch the leaf
879         # too, or wait to set the block hashes until we've also received the
880         # block itself, so we can hash it too, and set the chain+leaf all at
881         # the same time.
882         return self._block_hash_tree.needed_hashes(segnum, include_leaf=True)
883
884     def process_block_hashes(self, block_hashes):
885         assert self._block_hash_tree_is_authoritative
886         # this may raise BadHashError or NotEnoughHashesError
887         self._block_hash_tree.set_hashes(block_hashes)
888
889     def check_block(self, segnum, block):
890         assert self._block_hash_tree_is_authoritative
891         h = hashutil.block_hash(block)
892         # this may raise BadHashError or NotEnoughHashesError
893         self._block_hash_tree.set_hashes(leaves={segnum: h})
894
895 # TODO: maybe stop using EventStreamObserver: instead, use a Deferred and an
896 # auxilliary OVERDUE callback. Just make sure to get all the messages in the
897 # right order and on the right turns.
898
899 # TODO: we're asking for too much data. We probably don't need
900 # include_leaf=True in the block hash tree or ciphertext hash tree.
901
902 # TODO: we ask for ciphertext hash tree nodes from all shares (whenever
903 # _desire is called while we're missing those nodes), but we only consume it
904 # from the first response, leaving the rest of the data sitting in _received.
905 # This was ameliorated by clearing self._received after each block is
906 # complete.