]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/downloader/share.py
78cce8ed0906d0075a773c9a173284d69f05218a
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / downloader / share.py
1
2 import struct
3 import time
4 now = time.time
5
6 from twisted.python.failure import Failure
7 from foolscap.api import eventually
8 from allmydata.util import base32, log, hashutil, mathutil
9 from allmydata.util.spans import Spans, DataSpans
10 from allmydata.interfaces import HASH_SIZE
11 from allmydata.hashtree import IncompleteHashTree, BadHashError, \
12      NotEnoughHashesError
13
14 from allmydata.immutable.layout import make_write_bucket_proxy
15 from allmydata.util.observer import EventStreamObserver
16 from common import COMPLETE, CORRUPT, DEAD, BADSEGNUM
17
18
19 class LayoutInvalid(Exception):
20     pass
21 class DataUnavailable(Exception):
22     pass
23
24 class Share:
25     """I represent a single instance of a single share (e.g. I reference the
26     shnum2 for share SI=abcde on server xy12t, not the one on server ab45q).
27     I am associated with a CommonShare that remembers data that is held in
28     common among e.g. SI=abcde/shnum2 across all servers. I am also
29     associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all
30     servers).
31     """
32     # this is a specific implementation of IShare for tahoe's native storage
33     # servers. A different backend would use a different class.
34
35     def __init__(self, rref, server_version, verifycap, commonshare, node,
36                  download_status, peerid, shnum, dyhb_rtt, logparent):
37         self._rref = rref
38         self._server_version = server_version
39         self._node = node # holds share_hash_tree and UEB
40         self.actual_segment_size = node.segment_size # might still be None
41         # XXX change node.guessed_segment_size to
42         # node.best_guess_segment_size(), which should give us the real ones
43         # if known, else its guess.
44         self._guess_offsets(verifycap, node.guessed_segment_size)
45         self.actual_offsets = None
46         self._UEB_length = None
47         self._commonshare = commonshare # holds block_hash_tree
48         self._download_status = download_status
49         self._peerid = peerid
50         self._peerid_s = base32.b2a(peerid)[:5]
51         self._storage_index = verifycap.storage_index
52         self._si_prefix = base32.b2a(verifycap.storage_index)[:8]
53         self._shnum = shnum
54         self._dyhb_rtt = dyhb_rtt
55         # self._alive becomes False upon fatal corruption or server error
56         self._alive = True
57         self._lp = log.msg(format="%(share)s created", share=repr(self),
58                            level=log.NOISY, parent=logparent, umid="P7hv2w")
59
60         self._pending = Spans() # request sent but no response received yet
61         self._received = DataSpans() # ACK response received, with data
62         self._unavailable = Spans() # NAK response received, no data
63
64         # any given byte of the share can be in one of four states:
65         #  in: _wanted, _requested, _received
66         #      FALSE    FALSE       FALSE : don't care about it at all
67         #      TRUE     FALSE       FALSE : want it, haven't yet asked for it
68         #      TRUE     TRUE        FALSE : request is in-flight
69         #                                   or didn't get it
70         #      FALSE    TRUE        TRUE  : got it, haven't used it yet
71         #      FALSE    TRUE        FALSE : got it and used it
72         #      FALSE    FALSE       FALSE : block consumed, ready to ask again
73         #
74         # when we request data and get a NAK, we leave it in _requested
75         # to remind ourself to not ask for it again. We don't explicitly
76         # remove it from anything (maybe this should change).
77         #
78         # We retain the hashtrees in the Node, so we leave those spans in
79         # _requested (and never ask for them again, as long as the Node is
80         # alive). But we don't retain data blocks (too big), so when we
81         # consume a data block, we remove it from _requested, so a later
82         # download can re-fetch it.
83
84         self._requested_blocks = [] # (segnum, set(observer2..))
85         ver = server_version["http://allmydata.org/tahoe/protocols/storage/v1"]
86         self._overrun_ok = ver["tolerates-immutable-read-overrun"]
87         # If _overrun_ok and we guess the offsets correctly, we can get
88         # everything in one RTT. If _overrun_ok and we guess wrong, we might
89         # need two RTT (but we could get lucky and do it in one). If overrun
90         # is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version,
91         # 2=offset table, 3=UEB_length and everything else (hashes, block),
92         # 4=UEB.
93
94         self.had_corruption = False # for unit tests
95
96     def __repr__(self):
97         return "Share(sh%d-on-%s)" % (self._shnum, self._peerid_s)
98
99     def is_alive(self):
100         # XXX: reconsider. If the share sees a single error, should it remain
101         # dead for all time? Or should the next segment try again? This DEAD
102         # state is stored elsewhere too (SegmentFetcher per-share states?)
103         # and needs to be consistent. We clear _alive in self._fail(), which
104         # is called upon a network error, or layout failure, or hash failure
105         # in the UEB or a hash tree. We do not _fail() for a hash failure in
106         # a block, but of course we still tell our callers about
107         # state=CORRUPT so they'll find a different share.
108         return self._alive
109
110     def _guess_offsets(self, verifycap, guessed_segment_size):
111         self.guessed_segment_size = guessed_segment_size
112         size = verifycap.size
113         k = verifycap.needed_shares
114         N = verifycap.total_shares
115         r = self._node._calculate_sizes(guessed_segment_size)
116         # num_segments, block_size/tail_block_size
117         # guessed_segment_size/tail_segment_size/tail_segment_padded
118         share_size = mathutil.div_ceil(size, k)
119         # share_size is the amount of block data that will be put into each
120         # share, summed over all segments. It does not include hashes, the
121         # UEB, or other overhead.
122
123         # use the upload-side code to get this as accurate as possible
124         ht = IncompleteHashTree(N)
125         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
126         wbp = make_write_bucket_proxy(None, share_size, r["block_size"],
127                                       r["num_segments"], num_share_hashes, 0,
128                                       None)
129         self._fieldsize = wbp.fieldsize
130         self._fieldstruct = wbp.fieldstruct
131         self.guessed_offsets = wbp._offsets
132
133     # called by our client, the SegmentFetcher
134     def get_block(self, segnum):
135         """Add a block number to the list of requests. This will eventually
136         result in a fetch of the data necessary to validate the block, then
137         the block itself. The fetch order is generally
138         first-come-first-served, but requests may be answered out-of-order if
139         data becomes available sooner.
140
141         I return an EventStreamObserver, which has two uses. The first is to
142         call o.subscribe(), which gives me a place to send state changes and
143         eventually the data block. The second is o.cancel(), which removes
144         the request (if it is still active).
145
146         I will distribute the following events through my EventStreamObserver:
147          - state=OVERDUE: ?? I believe I should have had an answer by now.
148                           You may want to ask another share instead.
149          - state=BADSEGNUM: the segnum you asked for is too large. I must
150                             fetch a valid UEB before I can determine this,
151                             so the notification is asynchronous
152          - state=COMPLETE, block=data: here is a valid block
153          - state=CORRUPT: this share contains corrupted data
154          - state=DEAD, f=Failure: the server reported an error, this share
155                                   is unusable
156         """
157         log.msg("%s.get_block(%d)" % (repr(self), segnum),
158                 level=log.NOISY, parent=self._lp, umid="RTo9MQ")
159         assert segnum >= 0
160         o = EventStreamObserver()
161         o.set_canceler(self, "_cancel_block_request")
162         for i,(segnum0,observers) in enumerate(self._requested_blocks):
163             if segnum0 == segnum:
164                 observers.add(o)
165                 break
166         else:
167             self._requested_blocks.append( (segnum, set([o])) )
168         eventually(self.loop)
169         return o
170
171     def _cancel_block_request(self, o):
172         new_requests = []
173         for e in self._requested_blocks:
174             (segnum0, observers) = e
175             observers.discard(o)
176             if observers:
177                 new_requests.append(e)
178         self._requested_blocks = new_requests
179
180     # internal methods
181     def _active_segnum_and_observers(self):
182         if self._requested_blocks:
183             # we only retrieve information for one segment at a time, to
184             # minimize alacrity (first come, first served)
185             return self._requested_blocks[0]
186         return None, []
187
188     def loop(self):
189         if not self._alive:
190             return
191         try:
192             # if any exceptions occur here, kill the download
193             log.msg("%s.loop, reqs=[%s], pending=%s, received=%s,"
194                     " unavailable=%s" %
195                     (repr(self),
196                      ",".join([str(req[0]) for req in self._requested_blocks]),
197                      self._pending.dump(), self._received.dump(),
198                      self._unavailable.dump() ),
199                     level=log.NOISY, parent=self._lp, umid="BaL1zw")
200             self._do_loop()
201             # all exception cases call self._fail(), which clears self._alive
202         except (BadHashError, NotEnoughHashesError, LayoutInvalid), e:
203             # Abandon this share. We do this if we see corruption in the
204             # offset table, the UEB, or a hash tree. We don't abandon the
205             # whole share if we see corruption in a data block (we abandon
206             # just the one block, and still try to get data from other blocks
207             # on the same server). In theory, we could get good data from a
208             # share with a corrupt UEB (by first getting the UEB from some
209             # other share), or corrupt hash trees, but the logic to decide
210             # when this is safe is non-trivial. So for now, give up at the
211             # first sign of corruption.
212             #
213             # _satisfy_*() code which detects corruption should first call
214             # self._signal_corruption(), and then raise the exception.
215             log.msg(format="corruption detected in %(share)s",
216                     share=repr(self),
217                     level=log.UNUSUAL, parent=self._lp, umid="gWspVw")
218             self._fail(Failure(e), log.UNUSUAL)
219         except DataUnavailable, e:
220             # Abandon this share.
221             log.msg(format="need data that will never be available"
222                     " from %s: pending=%s, received=%s, unavailable=%s" %
223                     (repr(self),
224                      self._pending.dump(), self._received.dump(),
225                      self._unavailable.dump() ),
226                     level=log.UNUSUAL, parent=self._lp, umid="F7yJnQ")
227             self._fail(Failure(e), log.UNUSUAL)
228         except BaseException:
229             self._fail(Failure())
230             raise
231         log.msg("%s.loop done, reqs=[%s], pending=%s, received=%s,"
232                 " unavailable=%s" %
233                 (repr(self),
234                  ",".join([str(req[0]) for req in self._requested_blocks]),
235                  self._pending.dump(), self._received.dump(),
236                  self._unavailable.dump() ),
237                 level=log.NOISY, parent=self._lp, umid="9lRaRA")
238
239     def _do_loop(self):
240         # we are (eventually) called after all state transitions:
241         #  new segments added to self._requested_blocks
242         #  new data received from servers (responses to our read() calls)
243         #  impatience timer fires (server appears slow)
244
245         # First, consume all of the information that we currently have, for
246         # all the segments people currently want.
247         while self._get_satisfaction():
248             pass
249
250         # When we get no satisfaction (from the data we've received so far),
251         # we determine what data we desire (to satisfy more requests). The
252         # number of segments is finite, so I can't get no satisfaction
253         # forever.
254         wanted, needed = self._desire()
255
256         # Finally, send out requests for whatever we need (desire minus
257         # have). You can't always get what you want, but if you try
258         # sometimes, you just might find, you get what you need.
259         self._send_requests(wanted + needed)
260
261         # and sometimes you can't even get what you need
262         disappointment = needed & self._unavailable
263         if disappointment.len():
264             self.had_corruption = True
265             raise DataUnavailable("need %s but will never get it" %
266                                   disappointment.dump())
267
268     def _get_satisfaction(self):
269         # return True if we retired a data block, and should therefore be
270         # called again. Return False if we don't retire a data block (even if
271         # we do retire some other data, like hash chains).
272
273         if self.actual_offsets is None:
274             if not self._satisfy_offsets():
275                 # can't even look at anything without the offset table
276                 return False
277
278         if not self._node.have_UEB:
279             if not self._satisfy_UEB():
280                 # can't check any hashes without the UEB
281                 return False
282             # the call to _satisfy_UEB() will immediately set the
283             # authoritative num_segments in all our CommonShares. If we
284             # guessed wrong, we might stil be working on a bogus segnum
285             # (beyond the real range). We catch this and signal BADSEGNUM
286             # before invoking any further code that touches hashtrees.
287         self.actual_segment_size = self._node.segment_size # might be updated
288         assert self.actual_segment_size is not None
289
290         # knowing the UEB means knowing num_segments
291         assert self._node.num_segments is not None
292
293         segnum, observers = self._active_segnum_and_observers()
294         # if segnum is None, we don't really need to do anything (we have no
295         # outstanding readers right now), but we'll fill in the bits that
296         # aren't tied to any particular segment.
297
298         if segnum is not None and segnum >= self._node.num_segments:
299             for o in observers:
300                 o.notify(state=BADSEGNUM)
301             self._requested_blocks.pop(0)
302             return True
303
304         if self._node.share_hash_tree.needed_hashes(self._shnum):
305             if not self._satisfy_share_hash_tree():
306                 # can't check block_hash_tree without a root
307                 return False
308
309         if self._commonshare.need_block_hash_root():
310             block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum)
311             self._commonshare.set_block_hash_root(block_hash_root)
312
313         if segnum is None:
314             return False # we don't want any particular segment right now
315
316         # block_hash_tree
317         needed_hashes = self._commonshare.get_needed_block_hashes(segnum)
318         if needed_hashes:
319             if not self._satisfy_block_hash_tree(needed_hashes):
320                 # can't check block without block_hash_tree
321                 return False
322
323         # ciphertext_hash_tree
324         needed_hashes = self._node.get_needed_ciphertext_hashes(segnum)
325         if needed_hashes:
326             if not self._satisfy_ciphertext_hash_tree(needed_hashes):
327                 # can't check decoded blocks without ciphertext_hash_tree
328                 return False
329
330         # data blocks
331         return self._satisfy_data_block(segnum, observers)
332
333     def _satisfy_offsets(self):
334         version_s = self._received.get(0, 4)
335         if version_s is None:
336             return False
337         (version,) = struct.unpack(">L", version_s)
338         if version == 1:
339             table_start = 0x0c
340             self._fieldsize = 0x4
341             self._fieldstruct = "L"
342         elif version == 2:
343             table_start = 0x14
344             self._fieldsize = 0x8
345             self._fieldstruct = "Q"
346         else:
347             self.had_corruption = True
348             raise LayoutInvalid("unknown version %d (I understand 1 and 2)"
349                                 % version)
350         offset_table_size = 6 * self._fieldsize
351         table_s = self._received.pop(table_start, offset_table_size)
352         if table_s is None:
353             return False
354         fields = struct.unpack(">"+6*self._fieldstruct, table_s)
355         offsets = {}
356         for i,field in enumerate(['data',
357                                   'plaintext_hash_tree', # UNUSED
358                                   'crypttext_hash_tree',
359                                   'block_hashes',
360                                   'share_hashes',
361                                   'uri_extension',
362                                   ] ):
363             offsets[field] = fields[i]
364         self.actual_offsets = offsets
365         log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields),
366                 level=log.NOISY, parent=self._lp, umid="jedQcw")
367         self._received.remove(0, 4) # don't need this anymore
368
369         # validate the offsets a bit
370         share_hashes_size = offsets["uri_extension"] - offsets["share_hashes"]
371         if share_hashes_size < 0 or share_hashes_size % (2+HASH_SIZE) != 0:
372             # the share hash chain is stored as (hashnum,hash) pairs
373             self.had_corruption = True
374             raise LayoutInvalid("share hashes malformed -- should be a"
375                                 " multiple of %d bytes -- not %d" %
376                                 (2+HASH_SIZE, share_hashes_size))
377         block_hashes_size = offsets["share_hashes"] - offsets["block_hashes"]
378         if block_hashes_size < 0 or block_hashes_size % (HASH_SIZE) != 0:
379             # the block hash tree is stored as a list of hashes
380             self.had_corruption = True
381             raise LayoutInvalid("block hashes malformed -- should be a"
382                                 " multiple of %d bytes -- not %d" %
383                                 (HASH_SIZE, block_hashes_size))
384         # we only look at 'crypttext_hash_tree' if the UEB says we're
385         # actually using it. Same with 'plaintext_hash_tree'. This gives us
386         # some wiggle room: a place to stash data for later extensions.
387
388         return True
389
390     def _satisfy_UEB(self):
391         o = self.actual_offsets
392         fsize = self._fieldsize
393         UEB_length_s = self._received.get(o["uri_extension"], fsize)
394         if not UEB_length_s:
395             return False
396         (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
397         UEB_s = self._received.pop(o["uri_extension"]+fsize, UEB_length)
398         if not UEB_s:
399             return False
400         self._received.remove(o["uri_extension"], fsize)
401         try:
402             self._node.validate_and_store_UEB(UEB_s)
403             return True
404         except (LayoutInvalid, BadHashError), e:
405             # TODO: if this UEB was bad, we'll keep trying to validate it
406             # over and over again. Only log.err on the first one, or better
407             # yet skip all but the first
408             f = Failure(e)
409             self._signal_corruption(f, o["uri_extension"], fsize+UEB_length)
410             self.had_corruption = True
411             raise
412
413     def _satisfy_share_hash_tree(self):
414         # the share hash chain is stored as (hashnum,hash) tuples, so you
415         # can't fetch just the pieces you need, because you don't know
416         # exactly where they are. So fetch everything, and parse the results
417         # later.
418         o = self.actual_offsets
419         hashlen = o["uri_extension"] - o["share_hashes"]
420         assert hashlen % (2+HASH_SIZE) == 0
421         hashdata = self._received.get(o["share_hashes"], hashlen)
422         if not hashdata:
423             return False
424         share_hashes = {}
425         for i in range(0, hashlen, 2+HASH_SIZE):
426             (hashnum,) = struct.unpack(">H", hashdata[i:i+2])
427             hashvalue = hashdata[i+2:i+2+HASH_SIZE]
428             share_hashes[hashnum] = hashvalue
429         # TODO: if they give us an empty set of hashes,
430         # process_share_hashes() won't fail. We must ensure that this
431         # situation doesn't allow unverified shares through. Manual testing
432         # shows that set_block_hash_root() throws an assert because an
433         # internal node is None instead of an actual hash, but we want
434         # something better. It's probably best to add a method to
435         # IncompleteHashTree which takes a leaf number and raises an
436         # exception unless that leaf is present and fully validated.
437         try:
438             self._node.process_share_hashes(share_hashes)
439             # adds to self._node.share_hash_tree
440         except (BadHashError, NotEnoughHashesError), e:
441             f = Failure(e)
442             self._signal_corruption(f, o["share_hashes"], hashlen)
443             self.had_corruption = True
444             raise
445         self._received.remove(o["share_hashes"], hashlen)
446         return True
447
448     def _signal_corruption(self, f, start, offset):
449         # there was corruption somewhere in the given range
450         reason = "corruption in share[%d-%d): %s" % (start, start+offset,
451                                                      str(f.value))
452         self._rref.callRemoteOnly("advise_corrupt_share", reason)
453
454     def _satisfy_block_hash_tree(self, needed_hashes):
455         o_bh = self.actual_offsets["block_hashes"]
456         block_hashes = {}
457         for hashnum in needed_hashes:
458             hashdata = self._received.get(o_bh+hashnum*HASH_SIZE, HASH_SIZE)
459             if hashdata:
460                 block_hashes[hashnum] = hashdata
461             else:
462                 return False # missing some hashes
463         # note that we don't submit any hashes to the block_hash_tree until
464         # we've gotten them all, because the hash tree will throw an
465         # exception if we only give it a partial set (which it therefore
466         # cannot validate)
467         try:
468             self._commonshare.process_block_hashes(block_hashes)
469         except (BadHashError, NotEnoughHashesError), e:
470             f = Failure(e)
471             hashnums = ",".join([str(n) for n in sorted(block_hashes.keys())])
472             log.msg(format="hash failure in block_hashes=(%(hashnums)s),"
473                     " from %(share)s",
474                     hashnums=hashnums, shnum=self._shnum, share=repr(self),
475                     failure=f, level=log.WEIRD, parent=self._lp, umid="yNyFdA")
476             hsize = max(0, max(needed_hashes)) * HASH_SIZE
477             self._signal_corruption(f, o_bh, hsize)
478             self.had_corruption = True
479             raise
480         for hashnum in needed_hashes:
481             self._received.remove(o_bh+hashnum*HASH_SIZE, HASH_SIZE)
482         return True
483
484     def _satisfy_ciphertext_hash_tree(self, needed_hashes):
485         start = self.actual_offsets["crypttext_hash_tree"]
486         hashes = {}
487         for hashnum in needed_hashes:
488             hashdata = self._received.get(start+hashnum*HASH_SIZE, HASH_SIZE)
489             if hashdata:
490                 hashes[hashnum] = hashdata
491             else:
492                 return False # missing some hashes
493         # we don't submit any hashes to the ciphertext_hash_tree until we've
494         # gotten them all
495         try:
496             self._node.process_ciphertext_hashes(hashes)
497         except (BadHashError, NotEnoughHashesError), e:
498             f = Failure(e)
499             hashnums = ",".join([str(n) for n in sorted(hashes.keys())])
500             log.msg(format="hash failure in ciphertext_hashes=(%(hashnums)s),"
501                     " from %(share)s",
502                     hashnums=hashnums, share=repr(self), failure=f,
503                     level=log.WEIRD, parent=self._lp, umid="iZI0TA")
504             hsize = max(0, max(needed_hashes))*HASH_SIZE
505             self._signal_corruption(f, start, hsize)
506             self.had_corruption = True
507             raise
508         for hashnum in needed_hashes:
509             self._received.remove(start+hashnum*HASH_SIZE, HASH_SIZE)
510         return True
511
512     def _satisfy_data_block(self, segnum, observers):
513         tail = (segnum == self._node.num_segments-1)
514         datastart = self.actual_offsets["data"]
515         blockstart = datastart + segnum * self._node.block_size
516         blocklen = self._node.block_size
517         if tail:
518             blocklen = self._node.tail_block_size
519
520         block = self._received.pop(blockstart, blocklen)
521         if not block:
522             log.msg("no data for block %s (want [%d:+%d])" % (repr(self),
523                                                               blockstart, blocklen),
524                     level=log.NOISY, parent=self._lp, umid="aK0RFw")
525             return False
526         log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]",
527                 share=repr(self), start=blockstart, length=blocklen,
528                 level=log.NOISY, parent=self._lp, umid="uTDNZg")
529         # this block is being retired, either as COMPLETE or CORRUPT, since
530         # no further data reads will help
531         assert self._requested_blocks[0][0] == segnum
532         try:
533             self._commonshare.check_block(segnum, block)
534             # hurrah, we have a valid block. Deliver it.
535             for o in observers:
536                 # goes to SegmentFetcher._block_request_activity
537                 o.notify(state=COMPLETE, block=block)
538             # now clear our received data, to dodge the #1170 spans.py
539             # complexity bug
540             self._received = DataSpans()
541         except (BadHashError, NotEnoughHashesError), e:
542             # rats, we have a corrupt block. Notify our clients that they
543             # need to look elsewhere, and advise the server. Unlike
544             # corruption in other parts of the share, this doesn't cause us
545             # to abandon the whole share.
546             f = Failure(e)
547             log.msg(format="hash failure in block %(segnum)d, from %(share)s",
548                     segnum=segnum, share=repr(self), failure=f,
549                     level=log.WEIRD, parent=self._lp, umid="mZjkqA")
550             for o in observers:
551                 o.notify(state=CORRUPT)
552             self._signal_corruption(f, blockstart, blocklen)
553             self.had_corruption = True
554         # in either case, we've retired this block
555         self._requested_blocks.pop(0)
556         # popping the request keeps us from turning around and wanting the
557         # block again right away
558         return True # got satisfaction
559
560     def _desire(self):
561         segnum, observers = self._active_segnum_and_observers() # maybe None
562
563         # 'want_it' is for data we merely want: we know that we don't really
564         # need it. This includes speculative reads, like the first 1KB of the
565         # share (for the offset table) and the first 2KB of the UEB.
566         #
567         # 'need_it' is for data that, if we have the real offset table, we'll
568         # need. If we are only guessing at the offset table, it's merely
569         # wanted. (The share is abandoned if we can't get data that we really
570         # need).
571         #
572         # 'gotta_gotta_have_it' is for data that we absolutely need,
573         # independent of whether we're still guessing about the offset table:
574         # the version number and the offset table itself.
575         #
576         # Mr. Popeil, I'm in trouble, need your assistance on the double. Aww..
577
578         desire = Spans(), Spans(), Spans()
579         (want_it, need_it, gotta_gotta_have_it) = desire
580
581         self.actual_segment_size = self._node.segment_size # might be updated
582         o = self.actual_offsets or self.guessed_offsets
583         segsize = self.actual_segment_size or self.guessed_segment_size
584         r = self._node._calculate_sizes(segsize)
585
586         if not self.actual_offsets:
587             # all _desire functions add bits to the three desire[] spans
588             self._desire_offsets(desire)
589
590         # we can use guessed offsets as long as this server tolerates
591         # overrun. Otherwise, we must wait for the offsets to arrive before
592         # we try to read anything else.
593         if self.actual_offsets or self._overrun_ok:
594             if not self._node.have_UEB:
595                 self._desire_UEB(desire, o)
596             self._desire_share_hashes(desire, o)
597             if segnum is not None:
598                 # They might be asking for a segment number that is beyond
599                 # what we guess the file contains, but _desire_block_hashes
600                 # and _desire_data will tolerate that.
601                 self._desire_block_hashes(desire, o, segnum)
602                 self._desire_data(desire, o, r, segnum, segsize)
603
604         log.msg("end _desire: want_it=%s need_it=%s gotta=%s"
605                 % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()),
606                 level=log.NOISY, parent=self._lp, umid="IG7CgA")
607         if self.actual_offsets:
608             return (want_it, need_it+gotta_gotta_have_it)
609         else:
610             return (want_it+need_it, gotta_gotta_have_it)
611
612     def _desire_offsets(self, desire):
613         (want_it, need_it, gotta_gotta_have_it) = desire
614         if self._overrun_ok:
615             # easy! this includes version number, sizes, and offsets
616             want_it.add(0, 1024)
617             return
618
619         # v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44).
620         # To be conservative, only request the data that we know lives there,
621         # even if that means more roundtrips.
622
623         gotta_gotta_have_it.add(0, 4)  # version number, always safe
624         version_s = self._received.get(0, 4)
625         if not version_s:
626             return
627         (version,) = struct.unpack(">L", version_s)
628         # The code in _satisfy_offsets will have checked this version
629         # already. There is no code path to get this far with version>2.
630         assert 1 <= version <= 2, "can't get here, version=%d" % version
631         if version == 1:
632             table_start = 0x0c
633             fieldsize = 0x4
634         elif version == 2:
635             table_start = 0x14
636             fieldsize = 0x8
637         offset_table_size = 6 * fieldsize
638         gotta_gotta_have_it.add(table_start, offset_table_size)
639
640     def _desire_UEB(self, desire, o):
641         (want_it, need_it, gotta_gotta_have_it) = desire
642
643         # UEB data is stored as (length,data).
644         if self._overrun_ok:
645             # We can pre-fetch 2kb, which should probably cover it. If it
646             # turns out to be larger, we'll come back here later with a known
647             # length and fetch the rest.
648             want_it.add(o["uri_extension"], 2048)
649             # now, while that is probably enough to fetch the whole UEB, it
650             # might not be, so we need to do the next few steps as well. In
651             # most cases, the following steps will not actually add anything
652             # to need_it
653
654         need_it.add(o["uri_extension"], self._fieldsize)
655         # only use a length if we're sure it's correct, otherwise we'll
656         # probably fetch a huge number
657         if not self.actual_offsets:
658             return
659         UEB_length_s = self._received.get(o["uri_extension"], self._fieldsize)
660         if UEB_length_s:
661             (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
662             # we know the length, so make sure we grab everything
663             need_it.add(o["uri_extension"]+self._fieldsize, UEB_length)
664
665     def _desire_share_hashes(self, desire, o):
666         (want_it, need_it, gotta_gotta_have_it) = desire
667
668         if self._node.share_hash_tree.needed_hashes(self._shnum):
669             hashlen = o["uri_extension"] - o["share_hashes"]
670             need_it.add(o["share_hashes"], hashlen)
671
672     def _desire_block_hashes(self, desire, o, segnum):
673         (want_it, need_it, gotta_gotta_have_it) = desire
674
675         # block hash chain
676         for hashnum in self._commonshare.get_desired_block_hashes(segnum):
677             need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
678
679         # ciphertext hash chain
680         for hashnum in self._node.get_desired_ciphertext_hashes(segnum):
681             need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE)
682
683     def _desire_data(self, desire, o, r, segnum, segsize):
684         if segnum > r["num_segments"]:
685             # they're asking for a segment that's beyond what we think is the
686             # end of the file. We won't get here if we've already learned the
687             # real UEB: _get_satisfaction() will notice the out-of-bounds and
688             # terminate the loop. So we must still be guessing, which means
689             # that they might be correct in asking for such a large segnum.
690             # But if they're right, then our segsize/segnum guess is
691             # certainly wrong, which means we don't know what data blocks to
692             # ask for yet. So don't bother adding anything. When the UEB
693             # comes back and we learn the correct segsize/segnums, we'll
694             # either reject the request or have enough information to proceed
695             # normally. This costs one roundtrip.
696             log.msg("_desire_data: segnum(%d) looks wrong (numsegs=%d)"
697                     % (segnum, r["num_segments"]),
698                     level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
699             return
700         (want_it, need_it, gotta_gotta_have_it) = desire
701         tail = (segnum == r["num_segments"]-1)
702         datastart = o["data"]
703         blockstart = datastart + segnum * r["block_size"]
704         blocklen = r["block_size"]
705         if tail:
706             blocklen = r["tail_block_size"]
707         need_it.add(blockstart, blocklen)
708
709     def _send_requests(self, desired):
710         ask = desired - self._pending - self._received.get_spans()
711         log.msg("%s._send_requests, desired=%s, pending=%s, ask=%s" %
712                 (repr(self), desired.dump(), self._pending.dump(), ask.dump()),
713                 level=log.NOISY, parent=self._lp, umid="E94CVA")
714         # XXX At one time, this code distinguished between data blocks and
715         # hashes, and made sure to send (small) requests for hashes before
716         # sending (big) requests for blocks. The idea was to make sure that
717         # all hashes arrive before the blocks, so the blocks can be consumed
718         # and released in a single turn. I removed this for simplicity.
719         # Reconsider the removal: maybe bring it back.
720         ds = self._download_status
721
722         for (start, length) in ask:
723             # TODO: quantize to reasonably-large blocks
724             self._pending.add(start, length)
725             lp = log.msg(format="%(share)s._send_request"
726                          " [%(start)d:+%(length)d]",
727                          share=repr(self),
728                          start=start, length=length,
729                          level=log.NOISY, parent=self._lp, umid="sgVAyA")
730             req_ev = ds.add_request_sent(self._peerid, self._shnum,
731                                          start, length, now())
732             d = self._send_request(start, length)
733             d.addCallback(self._got_data, start, length, req_ev, lp)
734             d.addErrback(self._got_error, start, length, req_ev, lp)
735             d.addCallback(self._trigger_loop)
736             d.addErrback(lambda f:
737                          log.err(format="unhandled error during send_request",
738                                  failure=f, parent=self._lp,
739                                  level=log.WEIRD, umid="qZu0wg"))
740
741     def _send_request(self, start, length):
742         return self._rref.callRemote("read", start, length)
743
744     def _got_data(self, data, start, length, req_ev, lp):
745         req_ev.finished(len(data), now())
746         if not self._alive:
747             return
748         log.msg(format="%(share)s._got_data [%(start)d:+%(length)d] -> %(datalen)d",
749                 share=repr(self), start=start, length=length, datalen=len(data),
750                 level=log.NOISY, parent=lp, umid="5Qn6VQ")
751         self._pending.remove(start, length)
752         self._received.add(start, data)
753
754         # if we ask for [a:c], and we get back [a:b] (b<c), that means we're
755         # never going to get [b:c]. If we really need that data, this block
756         # will never complete. The easiest way to get into this situation is
757         # to hit a share with a corrupted offset table, or one that's somehow
758         # been truncated. On the other hand, when overrun_ok is true, we ask
759         # for data beyond the end of the share all the time (it saves some
760         # RTT when we don't know the length of the share ahead of time). So
761         # not every asked-for-but-not-received byte is fatal.
762         if len(data) < length:
763             self._unavailable.add(start+len(data), length-len(data))
764
765         # XXX if table corruption causes our sections to overlap, then one
766         # consumer (i.e. block hash tree) will pop/remove the data that
767         # another consumer (i.e. block data) mistakenly thinks it needs. It
768         # won't ask for that data again, because the span is in
769         # self._requested. But that span won't be in self._unavailable
770         # because we got it back from the server. TODO: handle this properly
771         # (raise DataUnavailable). Then add sanity-checking
772         # no-overlaps-allowed tests to the offset-table unpacking code to
773         # catch this earlier. XXX
774
775         # accumulate a wanted/needed span (not as self._x, but passed into
776         # desire* functions). manage a pending/in-flight list. when the
777         # requests are sent out, empty/discard the wanted/needed span and
778         # populate/augment the pending list. when the responses come back,
779         # augment either received+data or unavailable.
780
781         # if a corrupt offset table results in double-usage, we'll send
782         # double requests.
783
784         # the wanted/needed span is only "wanted" for the first pass. Once
785         # the offset table arrives, it's all "needed".
786
787     def _got_error(self, f, start, length, req_ev, lp):
788         req_ev.finished("error", now())
789         log.msg(format="error requesting %(start)d+%(length)d"
790                 " from %(server)s for si %(si)s",
791                 start=start, length=length,
792                 server=self._peerid_s, si=self._si_prefix,
793                 failure=f, parent=lp, level=log.UNUSUAL, umid="BZgAJw")
794         # retire our observers, assuming we won't be able to make any
795         # further progress
796         self._fail(f, log.UNUSUAL)
797
798     def _trigger_loop(self, res):
799         if self._alive:
800             eventually(self.loop)
801         return res
802
803     def _fail(self, f, level=log.WEIRD):
804         log.msg(format="abandoning %(share)s",
805                 share=repr(self), failure=f,
806                 level=level, parent=self._lp, umid="JKM2Og")
807         self._alive = False
808         for (segnum, observers) in self._requested_blocks:
809             for o in observers:
810                 o.notify(state=DEAD, f=f)
811
812
813 class CommonShare:
814     # TODO: defer creation of the hashtree until somebody uses us. There will
815     # be a lot of unused shares, and we shouldn't spend the memory on a large
816     # hashtree unless necessary.
817     """I hold data that is common across all instances of a single share,
818     like sh2 on both servers A and B. This is just the block hash tree.
819     """
820     def __init__(self, best_numsegs, si_prefix, shnum, logparent):
821         self.si_prefix = si_prefix
822         self.shnum = shnum
823
824         # in the beginning, before we have the real UEB, we can only guess at
825         # the number of segments. But we want to ask for block hashes early.
826         # So if we're asked for which block hashes are needed before we know
827         # numsegs for sure, we return a guess.
828         self._block_hash_tree = IncompleteHashTree(best_numsegs)
829         self._block_hash_tree_is_authoritative = False
830         self._block_hash_tree_leaves = best_numsegs
831         self._logparent = logparent
832
833     def __repr__(self):
834         return "CommonShare(%s-sh%d)" % (self.si_prefix, self.shnum)
835
836     def set_authoritative_num_segments(self, numsegs):
837         if self._block_hash_tree_leaves != numsegs:
838             self._block_hash_tree = IncompleteHashTree(numsegs)
839             self._block_hash_tree_leaves = numsegs
840         self._block_hash_tree_is_authoritative = True
841
842     def need_block_hash_root(self):
843         return bool(not self._block_hash_tree[0])
844
845     def set_block_hash_root(self, roothash):
846         assert self._block_hash_tree_is_authoritative
847         self._block_hash_tree.set_hashes({0: roothash})
848
849     def get_desired_block_hashes(self, segnum):
850         if segnum < self._block_hash_tree_leaves:
851             return self._block_hash_tree.needed_hashes(segnum,
852                                                        include_leaf=True)
853
854         # the segnum might be out-of-bounds. Originally it was due to a race
855         # between the receipt of the UEB on one share (from which we learn
856         # the correct number of segments, update all hash trees to the right
857         # size, and queue a BADSEGNUM to the SegmentFetcher) and the delivery
858         # of a new Share to the SegmentFetcher while that BADSEGNUM was
859         # queued (which sends out requests to the stale segnum, now larger
860         # than the hash tree). I fixed that (by making SegmentFetcher.loop
861         # check for a bad segnum at the start of each pass, instead of using
862         # the queued BADSEGNUM or a flag it sets), but just in case this
863         # still happens, I'm leaving the < in place. If it gets hit, there's
864         # a potential lost-progress problem, but I'm pretty sure that it will
865         # get cleared up on the following turn.
866         return []
867
868     def get_needed_block_hashes(self, segnum):
869         assert self._block_hash_tree_is_authoritative
870         # XXX: include_leaf=True needs thought: how did the old downloader do
871         # it? I think it grabbed *all* block hashes and set them all at once.
872         # Since we want to fetch less data, we either need to fetch the leaf
873         # too, or wait to set the block hashes until we've also received the
874         # block itself, so we can hash it too, and set the chain+leaf all at
875         # the same time.
876         return self._block_hash_tree.needed_hashes(segnum, include_leaf=True)
877
878     def process_block_hashes(self, block_hashes):
879         assert self._block_hash_tree_is_authoritative
880         # this may raise BadHashError or NotEnoughHashesError
881         self._block_hash_tree.set_hashes(block_hashes)
882
883     def check_block(self, segnum, block):
884         assert self._block_hash_tree_is_authoritative
885         h = hashutil.block_hash(block)
886         # this may raise BadHashError or NotEnoughHashesError
887         self._block_hash_tree.set_hashes(leaves={segnum: h})
888
889 # TODO: maybe stop using EventStreamObserver: instead, use a Deferred and an
890 # auxilliary OVERDUE callback. Just make sure to get all the messages in the
891 # right order and on the right turns.
892
893 # TODO: we're asking for too much data. We probably don't need
894 # include_leaf=True in the block hash tree or ciphertext hash tree.
895
896 # TODO: we ask for ciphertext hash tree nodes from all shares (whenever
897 # _desire is called while we're missing those nodes), but we only consume it
898 # from the first response, leaving the rest of the data sitting in _received.
899 # This was ameliorated by clearing self._received after each block is
900 # complete.