]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/downloader/share.py
413f90772f5f14a441b68aceac4bb1fca446fed3
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / downloader / share.py
1
2 import struct
3 import time
4 now = time.time
5
6 from twisted.python.failure import Failure
7 from foolscap.api import eventually
8 from allmydata.util import base32, log, hashutil, mathutil
9 from allmydata.util.spans import Spans, DataSpans
10 from allmydata.interfaces import HASH_SIZE
11 from allmydata.hashtree import IncompleteHashTree, BadHashError, \
12      NotEnoughHashesError
13
14 from allmydata.immutable.layout import make_write_bucket_proxy
15 from allmydata.util.observer import EventStreamObserver
16 from common import COMPLETE, CORRUPT, DEAD, BADSEGNUM
17
18
19 class LayoutInvalid(Exception):
20     pass
21 class DataUnavailable(Exception):
22     pass
23
24 class Share:
25     """I represent a single instance of a single share (e.g. I reference the
26     shnum2 for share SI=abcde on server xy12t, not the one on server ab45q).
27     I am associated with a CommonShare that remembers data that is held in
28     common among e.g. SI=abcde/shnum2 across all servers. I am also
29     associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all
30     servers).
31     """
32     # this is a specific implementation of IShare for tahoe's native storage
33     # servers. A different backend would use a different class.
34
35     def __init__(self, rref, server_version, verifycap, commonshare, node,
36                  download_status, peerid, shnum, logparent):
37         self._rref = rref
38         self._server_version = server_version
39         self._node = node # holds share_hash_tree and UEB
40         self.actual_segment_size = node.segment_size # might still be None
41         # XXX change node.guessed_segment_size to
42         # node.best_guess_segment_size(), which should give us the real ones
43         # if known, else its guess.
44         self._guess_offsets(verifycap, node.guessed_segment_size)
45         self.actual_offsets = None
46         self._UEB_length = None
47         self._commonshare = commonshare # holds block_hash_tree
48         self._download_status = download_status
49         self._peerid = peerid
50         self._peerid_s = base32.b2a(peerid)[:5]
51         self._storage_index = verifycap.storage_index
52         self._si_prefix = base32.b2a(verifycap.storage_index)[:8]
53         self._shnum = shnum
54         # self._alive becomes False upon fatal corruption or server error
55         self._alive = True
56         self._lp = log.msg(format="%(share)s created", share=repr(self),
57                            level=log.NOISY, parent=logparent, umid="P7hv2w")
58
59         self._pending = Spans() # request sent but no response received yet
60         self._received = DataSpans() # ACK response received, with data
61         self._unavailable = Spans() # NAK response received, no data
62
63         # any given byte of the share can be in one of four states:
64         #  in: _wanted, _requested, _received
65         #      FALSE    FALSE       FALSE : don't care about it at all
66         #      TRUE     FALSE       FALSE : want it, haven't yet asked for it
67         #      TRUE     TRUE        FALSE : request is in-flight
68         #                                   or didn't get it
69         #      FALSE    TRUE        TRUE  : got it, haven't used it yet
70         #      FALSE    TRUE        FALSE : got it and used it
71         #      FALSE    FALSE       FALSE : block consumed, ready to ask again
72         #
73         # when we request data and get a NAK, we leave it in _requested
74         # to remind ourself to not ask for it again. We don't explicitly
75         # remove it from anything (maybe this should change).
76         #
77         # We retain the hashtrees in the Node, so we leave those spans in
78         # _requested (and never ask for them again, as long as the Node is
79         # alive). But we don't retain data blocks (too big), so when we
80         # consume a data block, we remove it from _requested, so a later
81         # download can re-fetch it.
82
83         self._requested_blocks = [] # (segnum, set(observer2..))
84         ver = server_version["http://allmydata.org/tahoe/protocols/storage/v1"]
85         self._overrun_ok = ver["tolerates-immutable-read-overrun"]
86         # If _overrun_ok and we guess the offsets correctly, we can get
87         # everything in one RTT. If _overrun_ok and we guess wrong, we might
88         # need two RTT (but we could get lucky and do it in one). If overrun
89         # is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version,
90         # 2=offset table, 3=UEB_length and everything else (hashes, block),
91         # 4=UEB.
92
93         self.had_corruption = False # for unit tests
94
95     def __repr__(self):
96         return "Share(sh%d-on-%s)" % (self._shnum, self._peerid_s)
97
98     def is_alive(self):
99         # XXX: reconsider. If the share sees a single error, should it remain
100         # dead for all time? Or should the next segment try again? This DEAD
101         # state is stored elsewhere too (SegmentFetcher per-share states?)
102         # and needs to be consistent. We clear _alive in self._fail(), which
103         # is called upon a network error, or layout failure, or hash failure
104         # in the UEB or a hash tree. We do not _fail() for a hash failure in
105         # a block, but of course we still tell our callers about
106         # state=CORRUPT so they'll find a different share.
107         return self._alive
108
109     def _guess_offsets(self, verifycap, guessed_segment_size):
110         self.guessed_segment_size = guessed_segment_size
111         size = verifycap.size
112         k = verifycap.needed_shares
113         N = verifycap.total_shares
114         r = self._node._calculate_sizes(guessed_segment_size)
115         # num_segments, block_size/tail_block_size
116         # guessed_segment_size/tail_segment_size/tail_segment_padded
117         share_size = mathutil.div_ceil(size, k)
118         # share_size is the amount of block data that will be put into each
119         # share, summed over all segments. It does not include hashes, the
120         # UEB, or other overhead.
121
122         # use the upload-side code to get this as accurate as possible
123         ht = IncompleteHashTree(N)
124         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
125         wbp = make_write_bucket_proxy(None, share_size, r["block_size"],
126                                       r["num_segments"], num_share_hashes, 0,
127                                       None)
128         self._fieldsize = wbp.fieldsize
129         self._fieldstruct = wbp.fieldstruct
130         self.guessed_offsets = wbp._offsets
131
132     # called by our client, the SegmentFetcher
133     def get_block(self, segnum):
134         """Add a block number to the list of requests. This will eventually
135         result in a fetch of the data necessary to validate the block, then
136         the block itself. The fetch order is generally
137         first-come-first-served, but requests may be answered out-of-order if
138         data becomes available sooner.
139
140         I return an EventStreamObserver, which has two uses. The first is to
141         call o.subscribe(), which gives me a place to send state changes and
142         eventually the data block. The second is o.cancel(), which removes
143         the request (if it is still active).
144
145         I will distribute the following events through my EventStreamObserver:
146          - state=OVERDUE: ?? I believe I should have had an answer by now.
147                           You may want to ask another share instead.
148          - state=BADSEGNUM: the segnum you asked for is too large. I must
149                             fetch a valid UEB before I can determine this,
150                             so the notification is asynchronous
151          - state=COMPLETE, block=data: here is a valid block
152          - state=CORRUPT: this share contains corrupted data
153          - state=DEAD, f=Failure: the server reported an error, this share
154                                   is unusable
155         """
156         log.msg("%s.get_block(%d)" % (repr(self), segnum),
157                 level=log.NOISY, parent=self._lp, umid="RTo9MQ")
158         assert segnum >= 0
159         o = EventStreamObserver()
160         o.set_canceler(self, "_cancel_block_request")
161         for i,(segnum0,observers) in enumerate(self._requested_blocks):
162             if segnum0 == segnum:
163                 observers.add(o)
164                 break
165         else:
166             self._requested_blocks.append( (segnum, set([o])) )
167         eventually(self.loop)
168         return o
169
170     def _cancel_block_request(self, o):
171         new_requests = []
172         for e in self._requested_blocks:
173             (segnum0, observers) = e
174             observers.discard(o)
175             if observers:
176                 new_requests.append(e)
177         self._requested_blocks = new_requests
178
179     # internal methods
180     def _active_segnum_and_observers(self):
181         if self._requested_blocks:
182             # we only retrieve information for one segment at a time, to
183             # minimize alacrity (first come, first served)
184             return self._requested_blocks[0]
185         return None, []
186
187     def loop(self):
188         if not self._alive:
189             return
190         try:
191             # if any exceptions occur here, kill the download
192             log.msg("%s.loop, reqs=[%s], pending=%s, received=%s,"
193                     " unavailable=%s" %
194                     (repr(self),
195                      ",".join([str(req[0]) for req in self._requested_blocks]),
196                      self._pending.dump(), self._received.dump(),
197                      self._unavailable.dump() ),
198                     level=log.NOISY, parent=self._lp, umid="BaL1zw")
199             self._do_loop()
200             # all exception cases call self._fail(), which clears self._alive
201         except (BadHashError, NotEnoughHashesError, LayoutInvalid), e:
202             # Abandon this share. We do this if we see corruption in the
203             # offset table, the UEB, or a hash tree. We don't abandon the
204             # whole share if we see corruption in a data block (we abandon
205             # just the one block, and still try to get data from other blocks
206             # on the same server). In theory, we could get good data from a
207             # share with a corrupt UEB (by first getting the UEB from some
208             # other share), or corrupt hash trees, but the logic to decide
209             # when this is safe is non-trivial. So for now, give up at the
210             # first sign of corruption.
211             #
212             # _satisfy_*() code which detects corruption should first call
213             # self._signal_corruption(), and then raise the exception.
214             log.msg(format="corruption detected in %(share)s",
215                     share=repr(self),
216                     level=log.UNUSUAL, parent=self._lp, umid="gWspVw")
217             self._fail(Failure(e), log.UNUSUAL)
218         except DataUnavailable, e:
219             # Abandon this share.
220             log.msg(format="need data that will never be available"
221                     " from %s: pending=%s, received=%s, unavailable=%s" %
222                     (repr(self),
223                      self._pending.dump(), self._received.dump(),
224                      self._unavailable.dump() ),
225                     level=log.UNUSUAL, parent=self._lp, umid="F7yJnQ")
226             self._fail(Failure(e), log.UNUSUAL)
227         except BaseException:
228             self._fail(Failure())
229             raise
230         log.msg("%s.loop done, reqs=[%s], pending=%s, received=%s,"
231                 " unavailable=%s" %
232                 (repr(self),
233                  ",".join([str(req[0]) for req in self._requested_blocks]),
234                  self._pending.dump(), self._received.dump(),
235                  self._unavailable.dump() ),
236                 level=log.NOISY, parent=self._lp, umid="9lRaRA")
237
238     def _do_loop(self):
239         # we are (eventually) called after all state transitions:
240         #  new segments added to self._requested_blocks
241         #  new data received from servers (responses to our read() calls)
242         #  impatience timer fires (server appears slow)
243
244         # First, consume all of the information that we currently have, for
245         # all the segments people currently want.
246         while self._get_satisfaction():
247             pass
248
249         # When we get no satisfaction (from the data we've received so far),
250         # we determine what data we desire (to satisfy more requests). The
251         # number of segments is finite, so I can't get no satisfaction
252         # forever.
253         wanted, needed = self._desire()
254
255         # Finally, send out requests for whatever we need (desire minus
256         # have). You can't always get what you want, but if you try
257         # sometimes, you just might find, you get what you need.
258         self._send_requests(wanted + needed)
259
260         # and sometimes you can't even get what you need
261         disappointment = needed & self._unavailable
262         if disappointment.len():
263             self.had_corruption = True
264             raise DataUnavailable("need %s but will never get it" %
265                                   disappointment.dump())
266
267     def _get_satisfaction(self):
268         # return True if we retired a data block, and should therefore be
269         # called again. Return False if we don't retire a data block (even if
270         # we do retire some other data, like hash chains).
271
272         if self.actual_offsets is None:
273             if not self._satisfy_offsets():
274                 # can't even look at anything without the offset table
275                 return False
276
277         if not self._node.have_UEB:
278             if not self._satisfy_UEB():
279                 # can't check any hashes without the UEB
280                 return False
281         self.actual_segment_size = self._node.segment_size # might be updated
282         assert self.actual_segment_size is not None
283
284         # knowing the UEB means knowing num_segments. Despite the redundancy,
285         # this is the best place to set this. CommonShare.set_numsegs will
286         # ignore duplicate calls.
287         assert self._node.num_segments is not None
288         cs = self._commonshare
289         cs.set_numsegs(self._node.num_segments)
290
291         segnum, observers = self._active_segnum_and_observers()
292         # if segnum is None, we don't really need to do anything (we have no
293         # outstanding readers right now), but we'll fill in the bits that
294         # aren't tied to any particular segment.
295
296         if segnum is not None and segnum >= self._node.num_segments:
297             for o in observers:
298                 o.notify(state=BADSEGNUM)
299             self._requested_blocks.pop(0)
300             return True
301
302         if self._node.share_hash_tree.needed_hashes(self._shnum):
303             if not self._satisfy_share_hash_tree():
304                 # can't check block_hash_tree without a root
305                 return False
306
307         if cs.need_block_hash_root():
308             block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum)
309             cs.set_block_hash_root(block_hash_root)
310
311         if segnum is None:
312             return False # we don't want any particular segment right now
313
314         # block_hash_tree
315         needed_hashes = self._commonshare.get_needed_block_hashes(segnum)
316         if needed_hashes:
317             if not self._satisfy_block_hash_tree(needed_hashes):
318                 # can't check block without block_hash_tree
319                 return False
320
321         # ciphertext_hash_tree
322         needed_hashes = self._node.get_needed_ciphertext_hashes(segnum)
323         if needed_hashes:
324             if not self._satisfy_ciphertext_hash_tree(needed_hashes):
325                 # can't check decoded blocks without ciphertext_hash_tree
326                 return False
327
328         # data blocks
329         return self._satisfy_data_block(segnum, observers)
330
331     def _satisfy_offsets(self):
332         version_s = self._received.get(0, 4)
333         if version_s is None:
334             return False
335         (version,) = struct.unpack(">L", version_s)
336         if version == 1:
337             table_start = 0x0c
338             self._fieldsize = 0x4
339             self._fieldstruct = "L"
340         elif version == 2:
341             table_start = 0x14
342             self._fieldsize = 0x8
343             self._fieldstruct = "Q"
344         else:
345             self.had_corruption = True
346             raise LayoutInvalid("unknown version %d (I understand 1 and 2)"
347                                 % version)
348         offset_table_size = 6 * self._fieldsize
349         table_s = self._received.pop(table_start, offset_table_size)
350         if table_s is None:
351             return False
352         fields = struct.unpack(">"+6*self._fieldstruct, table_s)
353         offsets = {}
354         for i,field in enumerate(['data',
355                                   'plaintext_hash_tree', # UNUSED
356                                   'crypttext_hash_tree',
357                                   'block_hashes',
358                                   'share_hashes',
359                                   'uri_extension',
360                                   ] ):
361             offsets[field] = fields[i]
362         self.actual_offsets = offsets
363         log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields))
364         self._received.remove(0, 4) # don't need this anymore
365
366         # validate the offsets a bit
367         share_hashes_size = offsets["uri_extension"] - offsets["share_hashes"]
368         if share_hashes_size < 0 or share_hashes_size % (2+HASH_SIZE) != 0:
369             # the share hash chain is stored as (hashnum,hash) pairs
370             self.had_corruption = True
371             raise LayoutInvalid("share hashes malformed -- should be a"
372                                 " multiple of %d bytes -- not %d" %
373                                 (2+HASH_SIZE, share_hashes_size))
374         block_hashes_size = offsets["share_hashes"] - offsets["block_hashes"]
375         if block_hashes_size < 0 or block_hashes_size % (HASH_SIZE) != 0:
376             # the block hash tree is stored as a list of hashes
377             self.had_corruption = True
378             raise LayoutInvalid("block hashes malformed -- should be a"
379                                 " multiple of %d bytes -- not %d" %
380                                 (HASH_SIZE, block_hashes_size))
381         # we only look at 'crypttext_hash_tree' if the UEB says we're
382         # actually using it. Same with 'plaintext_hash_tree'. This gives us
383         # some wiggle room: a place to stash data for later extensions.
384
385         return True
386
387     def _satisfy_UEB(self):
388         o = self.actual_offsets
389         fsize = self._fieldsize
390         UEB_length_s = self._received.get(o["uri_extension"], fsize)
391         if not UEB_length_s:
392             return False
393         (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
394         UEB_s = self._received.pop(o["uri_extension"]+fsize, UEB_length)
395         if not UEB_s:
396             return False
397         self._received.remove(o["uri_extension"], fsize)
398         try:
399             self._node.validate_and_store_UEB(UEB_s)
400             return True
401         except (LayoutInvalid, BadHashError), e:
402             # TODO: if this UEB was bad, we'll keep trying to validate it
403             # over and over again. Only log.err on the first one, or better
404             # yet skip all but the first
405             f = Failure(e)
406             self._signal_corruption(f, o["uri_extension"], fsize+UEB_length)
407             self.had_corruption = True
408             raise
409
410     def _satisfy_share_hash_tree(self):
411         # the share hash chain is stored as (hashnum,hash) tuples, so you
412         # can't fetch just the pieces you need, because you don't know
413         # exactly where they are. So fetch everything, and parse the results
414         # later.
415         o = self.actual_offsets
416         hashlen = o["uri_extension"] - o["share_hashes"]
417         assert hashlen % (2+HASH_SIZE) == 0
418         hashdata = self._received.get(o["share_hashes"], hashlen)
419         if not hashdata:
420             return False
421         share_hashes = {}
422         for i in range(0, hashlen, 2+HASH_SIZE):
423             (hashnum,) = struct.unpack(">H", hashdata[i:i+2])
424             hashvalue = hashdata[i+2:i+2+HASH_SIZE]
425             share_hashes[hashnum] = hashvalue
426         # TODO: if they give us an empty set of hashes,
427         # process_share_hashes() won't fail. We must ensure that this
428         # situation doesn't allow unverified shares through. Manual testing
429         # shows that set_block_hash_root() throws an assert because an
430         # internal node is None instead of an actual hash, but we want
431         # something better. It's probably best to add a method to
432         # IncompleteHashTree which takes a leaf number and raises an
433         # exception unless that leaf is present and fully validated.
434         try:
435             self._node.process_share_hashes(share_hashes)
436             # adds to self._node.share_hash_tree
437         except (BadHashError, NotEnoughHashesError), e:
438             f = Failure(e)
439             self._signal_corruption(f, o["share_hashes"], hashlen)
440             self.had_corruption = True
441             raise
442         self._received.remove(o["share_hashes"], hashlen)
443         return True
444
445     def _signal_corruption(self, f, start, offset):
446         # there was corruption somewhere in the given range
447         reason = "corruption in share[%d-%d): %s" % (start, start+offset,
448                                                      str(f.value))
449         self._rref.callRemoteOnly("advise_corrupt_share", reason)
450
451     def _satisfy_block_hash_tree(self, needed_hashes):
452         o_bh = self.actual_offsets["block_hashes"]
453         block_hashes = {}
454         for hashnum in needed_hashes:
455             hashdata = self._received.get(o_bh+hashnum*HASH_SIZE, HASH_SIZE)
456             if hashdata:
457                 block_hashes[hashnum] = hashdata
458             else:
459                 return False # missing some hashes
460         # note that we don't submit any hashes to the block_hash_tree until
461         # we've gotten them all, because the hash tree will throw an
462         # exception if we only give it a partial set (which it therefore
463         # cannot validate)
464         try:
465             self._commonshare.process_block_hashes(block_hashes)
466         except (BadHashError, NotEnoughHashesError), e:
467             f = Failure(e)
468             hashnums = ",".join([str(n) for n in sorted(block_hashes.keys())])
469             log.msg(format="hash failure in block_hashes=(%(hashnums)s),"
470                     " from %(share)s",
471                     hashnums=hashnums, shnum=self._shnum, share=repr(self),
472                     failure=f, level=log.WEIRD, parent=self._lp, umid="yNyFdA")
473             hsize = max(0, max(needed_hashes)) * HASH_SIZE
474             self._signal_corruption(f, o_bh, hsize)
475             self.had_corruption = True
476             raise
477         for hashnum in needed_hashes:
478             self._received.remove(o_bh+hashnum*HASH_SIZE, HASH_SIZE)
479         return True
480
481     def _satisfy_ciphertext_hash_tree(self, needed_hashes):
482         start = self.actual_offsets["crypttext_hash_tree"]
483         hashes = {}
484         for hashnum in needed_hashes:
485             hashdata = self._received.get(start+hashnum*HASH_SIZE, HASH_SIZE)
486             if hashdata:
487                 hashes[hashnum] = hashdata
488             else:
489                 return False # missing some hashes
490         # we don't submit any hashes to the ciphertext_hash_tree until we've
491         # gotten them all
492         try:
493             self._node.process_ciphertext_hashes(hashes)
494         except (BadHashError, NotEnoughHashesError), e:
495             f = Failure(e)
496             hashnums = ",".join([str(n) for n in sorted(hashes.keys())])
497             log.msg(format="hash failure in ciphertext_hashes=(%(hashnums)s),"
498                     " from %(share)s",
499                     hashnums=hashnums, share=repr(self), failure=f,
500                     level=log.WEIRD, parent=self._lp, umid="iZI0TA")
501             hsize = max(0, max(needed_hashes))*HASH_SIZE
502             self._signal_corruption(f, start, hsize)
503             self.had_corruption = True
504             raise
505         for hashnum in needed_hashes:
506             self._received.remove(start+hashnum*HASH_SIZE, HASH_SIZE)
507         return True
508
509     def _satisfy_data_block(self, segnum, observers):
510         tail = (segnum == self._node.num_segments-1)
511         datastart = self.actual_offsets["data"]
512         blockstart = datastart + segnum * self._node.block_size
513         blocklen = self._node.block_size
514         if tail:
515             blocklen = self._node.tail_block_size
516
517         block = self._received.pop(blockstart, blocklen)
518         if not block:
519             log.msg("no data for block %s (want [%d:+%d])" % (repr(self),
520                                                               blockstart, blocklen))
521             return False
522         log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]",
523                 share=repr(self), start=blockstart, length=blocklen,
524                 level=log.NOISY, parent=self._lp, umid="uTDNZg")
525         # this block is being retired, either as COMPLETE or CORRUPT, since
526         # no further data reads will help
527         assert self._requested_blocks[0][0] == segnum
528         try:
529             self._commonshare.check_block(segnum, block)
530             # hurrah, we have a valid block. Deliver it.
531             for o in observers:
532                 # goes to SegmentFetcher._block_request_activity
533                 o.notify(state=COMPLETE, block=block)
534             # now clear our received data, to dodge the #1170 spans.py
535             # complexity bug
536             self._received = DataSpans()
537         except (BadHashError, NotEnoughHashesError), e:
538             # rats, we have a corrupt block. Notify our clients that they
539             # need to look elsewhere, and advise the server. Unlike
540             # corruption in other parts of the share, this doesn't cause us
541             # to abandon the whole share.
542             f = Failure(e)
543             log.msg(format="hash failure in block %(segnum)d, from %(share)s",
544                     segnum=segnum, share=repr(self), failure=f,
545                     level=log.WEIRD, parent=self._lp, umid="mZjkqA")
546             for o in observers:
547                 o.notify(state=CORRUPT)
548             self._signal_corruption(f, blockstart, blocklen)
549             self.had_corruption = True
550         # in either case, we've retired this block
551         self._requested_blocks.pop(0)
552         # popping the request keeps us from turning around and wanting the
553         # block again right away
554         return True # got satisfaction
555
556     def _desire(self):
557         segnum, observers = self._active_segnum_and_observers() # maybe None
558
559         # 'want_it' is for data we merely want: we know that we don't really
560         # need it. This includes speculative reads, like the first 1KB of the
561         # share (for the offset table) and the first 2KB of the UEB.
562         #
563         # 'need_it' is for data that, if we have the real offset table, we'll
564         # need. If we are only guessing at the offset table, it's merely
565         # wanted. (The share is abandoned if we can't get data that we really
566         # need).
567         #
568         # 'gotta_gotta_have_it' is for data that we absolutely need,
569         # independent of whether we're still guessing about the offset table:
570         # the version number and the offset table itself.
571         #
572         # Mr. Popeil, I'm in trouble, need your assistance on the double. Aww..
573
574         desire = Spans(), Spans(), Spans()
575         (want_it, need_it, gotta_gotta_have_it) = desire
576
577         self.actual_segment_size = self._node.segment_size # might be updated
578         o = self.actual_offsets or self.guessed_offsets
579         segsize = self.actual_segment_size or self.guessed_segment_size
580         r = self._node._calculate_sizes(segsize)
581
582         if not self.actual_offsets:
583             # all _desire functions add bits to the three desire[] spans
584             self._desire_offsets(desire)
585
586         # we can use guessed offsets as long as this server tolerates
587         # overrun. Otherwise, we must wait for the offsets to arrive before
588         # we try to read anything else.
589         if self.actual_offsets or self._overrun_ok:
590             if not self._node.have_UEB:
591                 self._desire_UEB(desire, o)
592             # They might ask for a segment that doesn't look right.
593             # _satisfy() will catch+reject bad segnums once we know the UEB
594             # (and therefore segsize and numsegs), so we'll only fail this
595             # test if we're still guessing. We want to avoid asking the
596             # hashtrees for needed_hashes() for bad segnums. So don't enter
597             # _desire_hashes or _desire_data unless the segnum looks
598             # reasonable.
599             if segnum < r["num_segments"]:
600                 # XXX somehow we're getting here for sh5. we don't yet know
601                 # the actual_segment_size, we're still working off the guess.
602                 # the ciphertext_hash_tree has been corrected, but the
603                 # commonshare._block_hash_tree is still in the guessed state.
604                 self._desire_share_hashes(desire, o)
605                 if segnum is not None:
606                     self._desire_block_hashes(desire, o, segnum)
607                     self._desire_data(desire, o, r, segnum, segsize)
608             else:
609                 log.msg("_desire: segnum(%d) looks wrong (numsegs=%d)"
610                         % (segnum, r["num_segments"]),
611                         level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
612
613         log.msg("end _desire: want_it=%s need_it=%s gotta=%s"
614                 % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()))
615         if self.actual_offsets:
616             return (want_it, need_it+gotta_gotta_have_it)
617         else:
618             return (want_it+need_it, gotta_gotta_have_it)
619
620     def _desire_offsets(self, desire):
621         (want_it, need_it, gotta_gotta_have_it) = desire
622         if self._overrun_ok:
623             # easy! this includes version number, sizes, and offsets
624             want_it.add(0, 1024)
625             return
626
627         # v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44).
628         # To be conservative, only request the data that we know lives there,
629         # even if that means more roundtrips.
630
631         gotta_gotta_have_it.add(0, 4)  # version number, always safe
632         version_s = self._received.get(0, 4)
633         if not version_s:
634             return
635         (version,) = struct.unpack(">L", version_s)
636         # The code in _satisfy_offsets will have checked this version
637         # already. There is no code path to get this far with version>2.
638         assert 1 <= version <= 2, "can't get here, version=%d" % version
639         if version == 1:
640             table_start = 0x0c
641             fieldsize = 0x4
642         elif version == 2:
643             table_start = 0x14
644             fieldsize = 0x8
645         offset_table_size = 6 * fieldsize
646         gotta_gotta_have_it.add(table_start, offset_table_size)
647
648     def _desire_UEB(self, desire, o):
649         (want_it, need_it, gotta_gotta_have_it) = desire
650
651         # UEB data is stored as (length,data).
652         if self._overrun_ok:
653             # We can pre-fetch 2kb, which should probably cover it. If it
654             # turns out to be larger, we'll come back here later with a known
655             # length and fetch the rest.
656             want_it.add(o["uri_extension"], 2048)
657             # now, while that is probably enough to fetch the whole UEB, it
658             # might not be, so we need to do the next few steps as well. In
659             # most cases, the following steps will not actually add anything
660             # to need_it
661
662         need_it.add(o["uri_extension"], self._fieldsize)
663         # only use a length if we're sure it's correct, otherwise we'll
664         # probably fetch a huge number
665         if not self.actual_offsets:
666             return
667         UEB_length_s = self._received.get(o["uri_extension"], self._fieldsize)
668         if UEB_length_s:
669             (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
670             # we know the length, so make sure we grab everything
671             need_it.add(o["uri_extension"]+self._fieldsize, UEB_length)
672
673     def _desire_share_hashes(self, desire, o):
674         (want_it, need_it, gotta_gotta_have_it) = desire
675
676         if self._node.share_hash_tree.needed_hashes(self._shnum):
677             hashlen = o["uri_extension"] - o["share_hashes"]
678             need_it.add(o["share_hashes"], hashlen)
679
680     def _desire_block_hashes(self, desire, o, segnum):
681         (want_it, need_it, gotta_gotta_have_it) = desire
682
683         # block hash chain
684         for hashnum in self._commonshare.get_needed_block_hashes(segnum):
685             need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
686
687         # ciphertext hash chain
688         for hashnum in self._node.get_needed_ciphertext_hashes(segnum):
689             need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE)
690
691     def _desire_data(self, desire, o, r, segnum, segsize):
692         (want_it, need_it, gotta_gotta_have_it) = desire
693         tail = (segnum == r["num_segments"]-1)
694         datastart = o["data"]
695         blockstart = datastart + segnum * r["block_size"]
696         blocklen = r["block_size"]
697         if tail:
698             blocklen = r["tail_block_size"]
699         need_it.add(blockstart, blocklen)
700
701     def _send_requests(self, desired):
702         ask = desired - self._pending - self._received.get_spans()
703         log.msg("%s._send_requests, desired=%s, pending=%s, ask=%s" %
704                 (repr(self), desired.dump(), self._pending.dump(), ask.dump()),
705                 level=log.NOISY, parent=self._lp, umid="E94CVA")
706         # XXX At one time, this code distinguished between data blocks and
707         # hashes, and made sure to send (small) requests for hashes before
708         # sending (big) requests for blocks. The idea was to make sure that
709         # all hashes arrive before the blocks, so the blocks can be consumed
710         # and released in a single turn. I removed this for simplicity.
711         # Reconsider the removal: maybe bring it back.
712         ds = self._download_status
713
714         for (start, length) in ask:
715             # TODO: quantize to reasonably-large blocks
716             self._pending.add(start, length)
717             lp = log.msg(format="%(share)s._send_request"
718                          " [%(start)d:+%(length)d]",
719                          share=repr(self),
720                          start=start, length=length,
721                          level=log.NOISY, parent=self._lp, umid="sgVAyA")
722             req_ev = ds.add_request_sent(self._peerid, self._shnum,
723                                          start, length, now())
724             d = self._send_request(start, length)
725             d.addCallback(self._got_data, start, length, req_ev, lp)
726             d.addErrback(self._got_error, start, length, req_ev, lp)
727             d.addCallback(self._trigger_loop)
728             d.addErrback(lambda f:
729                          log.err(format="unhandled error during send_request",
730                                  failure=f, parent=self._lp,
731                                  level=log.WEIRD, umid="qZu0wg"))
732
733     def _send_request(self, start, length):
734         return self._rref.callRemote("read", start, length)
735
736     def _got_data(self, data, start, length, req_ev, lp):
737         req_ev.finished(len(data), now())
738         if not self._alive:
739             return
740         log.msg(format="%(share)s._got_data [%(start)d:+%(length)d] -> %(datalen)d",
741                 share=repr(self), start=start, length=length, datalen=len(data),
742                 level=log.NOISY, parent=lp, umid="5Qn6VQ")
743         self._pending.remove(start, length)
744         self._received.add(start, data)
745
746         # if we ask for [a:c], and we get back [a:b] (b<c), that means we're
747         # never going to get [b:c]. If we really need that data, this block
748         # will never complete. The easiest way to get into this situation is
749         # to hit a share with a corrupted offset table, or one that's somehow
750         # been truncated. On the other hand, when overrun_ok is true, we ask
751         # for data beyond the end of the share all the time (it saves some
752         # RTT when we don't know the length of the share ahead of time). So
753         # not every asked-for-but-not-received byte is fatal.
754         if len(data) < length:
755             self._unavailable.add(start+len(data), length-len(data))
756
757         # XXX if table corruption causes our sections to overlap, then one
758         # consumer (i.e. block hash tree) will pop/remove the data that
759         # another consumer (i.e. block data) mistakenly thinks it needs. It
760         # won't ask for that data again, because the span is in
761         # self._requested. But that span won't be in self._unavailable
762         # because we got it back from the server. TODO: handle this properly
763         # (raise DataUnavailable). Then add sanity-checking
764         # no-overlaps-allowed tests to the offset-table unpacking code to
765         # catch this earlier. XXX
766
767         # accumulate a wanted/needed span (not as self._x, but passed into
768         # desire* functions). manage a pending/in-flight list. when the
769         # requests are sent out, empty/discard the wanted/needed span and
770         # populate/augment the pending list. when the responses come back,
771         # augment either received+data or unavailable.
772
773         # if a corrupt offset table results in double-usage, we'll send
774         # double requests.
775
776         # the wanted/needed span is only "wanted" for the first pass. Once
777         # the offset table arrives, it's all "needed".
778
779     def _got_error(self, f, start, length, req_ev, lp):
780         req_ev.finished("error", now())
781         log.msg(format="error requesting %(start)d+%(length)d"
782                 " from %(server)s for si %(si)s",
783                 start=start, length=length,
784                 server=self._peerid_s, si=self._si_prefix,
785                 failure=f, parent=lp, level=log.UNUSUAL, umid="BZgAJw")
786         # retire our observers, assuming we won't be able to make any
787         # further progress
788         self._fail(f, log.UNUSUAL)
789
790     def _trigger_loop(self, res):
791         if self._alive:
792             eventually(self.loop)
793         return res
794
795     def _fail(self, f, level=log.WEIRD):
796         log.msg(format="abandoning %(share)s",
797                 share=repr(self), failure=f,
798                 level=level, parent=self._lp, umid="JKM2Og")
799         self._alive = False
800         for (segnum, observers) in self._requested_blocks:
801             for o in observers:
802                 o.notify(state=DEAD, f=f)
803
804
805 class CommonShare:
806     """I hold data that is common across all instances of a single share,
807     like sh2 on both servers A and B. This is just the block hash tree.
808     """
809     def __init__(self, guessed_numsegs, si_prefix, shnum, logparent):
810         self.si_prefix = si_prefix
811         self.shnum = shnum
812         # in the beginning, before we have the real UEB, we can only guess at
813         # the number of segments. But we want to ask for block hashes early.
814         # So if we're asked for which block hashes are needed before we know
815         # numsegs for sure, we return a guess.
816         self._block_hash_tree = IncompleteHashTree(guessed_numsegs)
817         self._know_numsegs = False
818         self._logparent = logparent
819
820     def set_numsegs(self, numsegs):
821         if self._know_numsegs:
822             return
823         self._block_hash_tree = IncompleteHashTree(numsegs)
824         self._know_numsegs = True
825
826     def need_block_hash_root(self):
827         return bool(not self._block_hash_tree[0])
828
829     def set_block_hash_root(self, roothash):
830         assert self._know_numsegs
831         self._block_hash_tree.set_hashes({0: roothash})
832
833     def get_needed_block_hashes(self, segnum):
834         # XXX: include_leaf=True needs thought: how did the old downloader do
835         # it? I think it grabbed *all* block hashes and set them all at once.
836         # Since we want to fetch less data, we either need to fetch the leaf
837         # too, or wait to set the block hashes until we've also received the
838         # block itself, so we can hash it too, and set the chain+leaf all at
839         # the same time.
840         return self._block_hash_tree.needed_hashes(segnum, include_leaf=True)
841
842     def process_block_hashes(self, block_hashes):
843         assert self._know_numsegs
844         # this may raise BadHashError or NotEnoughHashesError
845         self._block_hash_tree.set_hashes(block_hashes)
846
847     def check_block(self, segnum, block):
848         assert self._know_numsegs
849         h = hashutil.block_hash(block)
850         # this may raise BadHashError or NotEnoughHashesError
851         self._block_hash_tree.set_hashes(leaves={segnum: h})