2 import os, struct, time, weakref
3 from itertools import islice, count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.application import service
8 from foolscap.eventual import eventually
9 from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
10 IPublishStatus, IRetrieveStatus
11 from allmydata.util import base32, hashutil, mathutil, idlib, log
12 from allmydata.uri import WriteableSSKFileURI
13 from allmydata import hashtree, codec, storage
14 from allmydata.encode import NotEnoughPeersError
15 from pycryptopp.publickey import rsa
16 from pycryptopp.cipher.aes import AES
19 class NotMutableError(Exception):
22 class NeedMoreDataError(Exception):
23 def __init__(self, needed_bytes, encprivkey_offset, encprivkey_length):
24 Exception.__init__(self)
25 self.needed_bytes = needed_bytes # up through EOF
26 self.encprivkey_offset = encprivkey_offset
27 self.encprivkey_length = encprivkey_length
29 return "<NeedMoreDataError (%d bytes)>" % self.needed_bytes
31 class UncoordinatedWriteError(Exception):
33 return "<%s -- You, oh user, tried to change a file or directory at the same time as another process was trying to change it. To avoid data loss, don't do this. Please see docs/write_coordination.html for details.>" % (self.__class__.__name__,)
35 class CorruptShareError(Exception):
36 def __init__(self, peerid, shnum, reason):
37 self.args = (peerid, shnum, reason)
42 short_peerid = idlib.nodeid_b2a(self.peerid)[:8]
43 return "<CorruptShareError peerid=%s shnum[%d]: %s" % (short_peerid,
47 PREFIX = ">BQ32s16s" # each version has a different prefix
48 SIGNED_PREFIX = ">BQ32s16s BBQQ" # this is covered by the signature
49 HEADER = ">BQ32s16s BBQQ LLLLQQ" # includes offsets
50 HEADER_LENGTH = struct.calcsize(HEADER)
52 def unpack_prefix_and_signature(data):
53 assert len(data) >= HEADER_LENGTH
55 prefix = data[:struct.calcsize(SIGNED_PREFIX)]
61 k, N, segsize, datalen,
63 o['share_hash_chain'],
67 o['EOF']) = struct.unpack(HEADER, data[:HEADER_LENGTH])
70 if len(data) < o['share_hash_chain']:
71 raise NeedMoreDataError(o['share_hash_chain'],
72 o['enc_privkey'], o['EOF']-o['enc_privkey'])
74 pubkey_s = data[HEADER_LENGTH:o['signature']]
75 signature = data[o['signature']:o['share_hash_chain']]
77 return (seqnum, root_hash, IV, k, N, segsize, datalen,
78 pubkey_s, signature, prefix)
80 def unpack_share(data):
81 assert len(data) >= HEADER_LENGTH
87 k, N, segsize, datalen,
89 o['share_hash_chain'],
93 o['EOF']) = struct.unpack(HEADER, data[:HEADER_LENGTH])
96 if len(data) < o['EOF']:
97 raise NeedMoreDataError(o['EOF'],
98 o['enc_privkey'], o['EOF']-o['enc_privkey'])
100 pubkey = data[HEADER_LENGTH:o['signature']]
101 signature = data[o['signature']:o['share_hash_chain']]
102 share_hash_chain_s = data[o['share_hash_chain']:o['block_hash_tree']]
103 share_hash_format = ">H32s"
104 hsize = struct.calcsize(share_hash_format)
105 assert len(share_hash_chain_s) % hsize == 0, len(share_hash_chain_s)
106 share_hash_chain = []
107 for i in range(0, len(share_hash_chain_s), hsize):
108 chunk = share_hash_chain_s[i:i+hsize]
109 (hid, h) = struct.unpack(share_hash_format, chunk)
110 share_hash_chain.append( (hid, h) )
111 share_hash_chain = dict(share_hash_chain)
112 block_hash_tree_s = data[o['block_hash_tree']:o['share_data']]
113 assert len(block_hash_tree_s) % 32 == 0, len(block_hash_tree_s)
115 for i in range(0, len(block_hash_tree_s), 32):
116 block_hash_tree.append(block_hash_tree_s[i:i+32])
118 share_data = data[o['share_data']:o['enc_privkey']]
119 enc_privkey = data[o['enc_privkey']:o['EOF']]
121 return (seqnum, root_hash, IV, k, N, segsize, datalen,
122 pubkey, signature, share_hash_chain, block_hash_tree,
123 share_data, enc_privkey)
126 def pack_checkstring(seqnum, root_hash, IV):
127 return struct.pack(PREFIX,
133 def unpack_checkstring(checkstring):
134 cs_len = struct.calcsize(PREFIX)
135 version, seqnum, root_hash, IV = struct.unpack(PREFIX, checkstring[:cs_len])
136 assert version == 0 # TODO: just ignore the share
137 return (seqnum, root_hash, IV)
139 def pack_prefix(seqnum, root_hash, IV,
140 required_shares, total_shares,
141 segment_size, data_length):
142 prefix = struct.pack(SIGNED_PREFIX,
155 def pack_offsets(verification_key_length, signature_length,
156 share_hash_chain_length, block_hash_tree_length,
157 share_data_length, encprivkey_length):
158 post_offset = HEADER_LENGTH
160 o1 = offsets['signature'] = post_offset + verification_key_length
161 o2 = offsets['share_hash_chain'] = o1 + signature_length
162 o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length
163 o4 = offsets['share_data'] = o3 + block_hash_tree_length
164 o5 = offsets['enc_privkey'] = o4 + share_data_length
165 o6 = offsets['EOF'] = o5 + encprivkey_length
167 return struct.pack(">LLLLQQ",
168 offsets['signature'],
169 offsets['share_hash_chain'],
170 offsets['block_hash_tree'],
171 offsets['share_data'],
172 offsets['enc_privkey'],
175 def pack_share(prefix, verification_key, signature,
176 share_hash_chain, block_hash_tree,
177 share_data, encprivkey):
178 share_hash_chain_s = "".join([struct.pack(">H32s", i, share_hash_chain[i])
179 for i in sorted(share_hash_chain.keys())])
180 for h in block_hash_tree:
182 block_hash_tree_s = "".join(block_hash_tree)
184 offsets = pack_offsets(len(verification_key),
186 len(share_hash_chain_s),
187 len(block_hash_tree_s),
190 final_share = "".join([prefix,
201 class RetrieveStatus:
202 implements(IRetrieveStatus)
203 statusid_counter = count(0)
208 self.storage_index = None
211 self.status = "Not started"
213 self.counter = self.statusid_counter.next()
215 def get_storage_index(self):
216 return self.storage_index
217 def using_helper(self):
221 def get_status(self):
223 def get_progress(self):
225 def get_active(self):
227 def get_counter(self):
230 def set_storage_index(self, si):
231 self.storage_index = si
232 def set_helper(self, helper):
234 def set_size(self, size):
236 def set_status(self, status):
238 def set_progress(self, value):
239 self.progress = value
240 def set_active(self, value):
244 def __init__(self, filenode):
245 self._node = filenode
246 self._contents = None
247 # if the filenode already has a copy of the pubkey, use it. Otherwise
248 # we'll grab a copy from the first peer we talk to.
249 self._pubkey = filenode.get_pubkey()
250 self._storage_index = filenode.get_storage_index()
251 self._readkey = filenode.get_readkey()
252 self._last_failure = None
253 self._log_number = None
254 self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
255 num = self._node._client.log("Retrieve(%s): starting" % prefix)
256 self._log_number = num
257 self._status = RetrieveStatus()
258 self._status.set_storage_index(self._storage_index)
259 self._status.set_helper(False)
260 self._status.set_progress(0.0)
261 self._status.set_active(True)
263 def log(self, msg, **kwargs):
264 prefix = self._log_prefix
265 num = self._node._client.log("Retrieve(%s): %s" % (prefix, msg),
266 parent=self._log_number, **kwargs)
269 def log_err(self, f):
270 num = log.err(f, parent=self._log_number)
274 """Retrieve the filenode's current contents. Returns a Deferred that
275 fires with a string when the contents have been retrieved."""
277 # 1: make a guess as to how many peers we should send requests to. We
278 # want to hear from k+EPSILON (k because we have to, EPSILON extra
279 # because that helps us resist rollback attacks). [TRADEOFF:
280 # EPSILON>0 means extra work] [TODO: implement EPSILON>0]
281 # 2: build the permuted peerlist, taking the first k+E peers
282 # 3: send readv requests to all of them in parallel, asking for the
283 # first 2KB of data from all shares
284 # 4: when the first of the responses comes back, extract information:
285 # 4a: extract the pubkey, hash it, compare against the URI. If this
286 # check fails, log a WEIRD and ignore the peer.
287 # 4b: extract the prefix (seqnum, roothash, k, N, segsize, datalength)
288 # and verify the signature on it. If this is wrong, log a WEIRD
289 # and ignore the peer. Save the prefix string in a dict that's
290 # keyed by (seqnum,roothash) and has (prefixstring, sharemap) as
291 # values. We'll use the prefixstring again later to avoid doing
292 # multiple signature checks
293 # 4c: extract the share size (offset of the last byte of sharedata).
294 # if it is larger than 2k, send new readv requests to pull down
296 # 4d: if the extracted 'k' is more than we guessed, rebuild a larger
297 # permuted peerlist and send out more readv requests.
298 # 5: as additional responses come back, extract the prefix and compare
299 # against the ones we've already seen. If they match, add the
300 # peerid to the corresponing sharemap dict
301 # 6: [TRADEOFF]: if EPSILON==0, when we get k responses for the
302 # same (seqnum,roothash) key, attempt to reconstruct that data.
303 # if EPSILON>0, wait for k+EPSILON responses, then attempt to
304 # reconstruct the most popular version.. If we do not have enough
305 # shares and there are still requests outstanding, wait. If there
306 # are not still requests outstanding (todo: configurable), send
307 # more requests. Never send queries to more than 2*N servers. If
308 # we've run out of servers, fail.
309 # 7: if we discover corrupt shares during the reconstruction process,
310 # remove that share from the sharemap. and start step#6 again.
312 initial_query_count = 5
313 # how much data should be read on the first fetch? It would be nice
314 # if we could grab small directories in a single RTT. The way we pack
315 # dirnodes consumes about 112 bytes per child. The way we pack
316 # mutable files puts about 935 bytes of pubkey+sig+hashes, then our
317 # data, then about 1216 bytes of encprivkey. So 2kB ought to get us
318 # about 9 entries, which seems like a good default.
319 self._read_size = 2000
321 # we might not know how many shares we need yet.
322 self._required_shares = self._node.get_required_shares()
323 self._total_shares = self._node.get_total_shares()
325 # self._valid_versions is a dictionary in which the keys are
326 # 'verinfo' tuples (seqnum, root_hash, IV). Every time we hear about
327 # a new potential version of the file, we check its signature, and
328 # the valid ones are added to this dictionary. The values of the
329 # dictionary are (prefix, sharemap) tuples, where 'prefix' is just
330 # the first part of the share (containing the serialized verinfo),
331 # for easier comparison. 'sharemap' is a DictOfSets, in which the
332 # keys are sharenumbers, and the values are sets of (peerid, data)
333 # tuples. There is a (peerid, data) tuple for every instance of a
334 # given share that we've seen. The 'data' in this tuple is a full
335 # copy of the SDMF share, starting with the \x00 version byte and
336 # continuing through the last byte of sharedata.
337 self._valid_versions = {}
339 # self._valid_shares is a dict mapping (peerid,data) tuples to
340 # validated sharedata strings. Each time we examine the hash chains
341 # inside a share and validate them against a signed root_hash, we add
342 # the share to self._valid_shares . We use this to avoid re-checking
343 # the hashes over and over again.
344 self._valid_shares = {}
346 self._done_deferred = defer.Deferred()
348 d = defer.succeed(initial_query_count)
349 d.addCallback(self._choose_initial_peers)
350 d.addCallback(self._send_initial_requests)
351 d.addCallback(self._wait_for_finish)
354 def _wait_for_finish(self, res):
355 return self._done_deferred
357 def _choose_initial_peers(self, numqueries):
359 full_peerlist = n._client.get_permuted_peers("storage",
362 # _peerlist is a list of (peerid,conn) tuples for peers that are
363 # worth talking too. This starts with the first numqueries in the
364 # permuted list. If that's not enough to get us a recoverable
365 # version, we expand this to include the first 2*total_shares peerids
366 # (assuming we learn what total_shares is from one of the first
368 self._peerlist = [p for p in islice(full_peerlist, numqueries)]
369 # _peerlist_limit is the query limit we used to build this list. If
370 # we later increase this limit, it may be useful to re-scan the
372 self._peerlist_limit = numqueries
373 return self._peerlist
375 def _send_initial_requests(self, peerlist):
376 self._bad_peerids = set()
378 self._queries_outstanding = set()
379 self._used_peers = set()
380 self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
382 for (peerid, ss) in peerlist:
383 self._queries_outstanding.add(peerid)
384 self._do_query(ss, peerid, self._storage_index, self._read_size)
386 # control flow beyond this point: state machine. Receiving responses
387 # from queries is the input. We might send out more queries, or we
388 # might produce a result.
391 def _do_query(self, ss, peerid, storage_index, readsize):
392 self._queries_outstanding.add(peerid)
393 d = ss.callRemote("slot_readv", storage_index, [], [(0, readsize)])
394 d.addCallback(self._got_results, peerid, readsize, (ss, storage_index))
395 d.addErrback(self._query_failed, peerid)
396 # errors that aren't handled by _query_failed (and errors caused by
397 # _query_failed) get logged, but we still want to check for doneness.
398 d.addErrback(log.err)
399 d.addBoth(self._check_for_done)
402 def _deserialize_pubkey(self, pubkey_s):
403 verifier = rsa.create_verifying_key_from_string(pubkey_s)
406 def _got_results(self, datavs, peerid, readsize, stuff):
407 self._queries_outstanding.discard(peerid)
408 self._used_peers.add(peerid)
409 if not self._running:
412 for shnum,datav in datavs.items():
415 self._got_results_one_share(shnum, data, peerid)
416 except NeedMoreDataError, e:
417 # ah, just re-send the query then.
418 self._read_size = max(self._read_size, e.needed_bytes)
419 # TODO: for MDMF, sanity-check self._read_size: don't let one
420 # server cause us to try to read gigabytes of data from all
422 (ss, storage_index) = stuff
423 self._do_query(ss, peerid, storage_index, self._read_size)
425 except CorruptShareError, e:
426 # log it and give the other shares a chance to be processed
427 f = failure.Failure()
428 self.log("bad share: %s %s" % (f, f.value), level=log.WEIRD)
429 self._bad_peerids.add(peerid)
430 self._last_failure = f
434 def _got_results_one_share(self, shnum, data, peerid):
435 self.log("_got_results: got shnum #%d from peerid %s"
436 % (shnum, idlib.shortnodeid_b2a(peerid)))
437 (seqnum, root_hash, IV, k, N, segsize, datalength,
438 # this might raise NeedMoreDataError, in which case the rest of
439 # the shares are probably short too. _query_failed() will take
440 # responsiblity for re-issuing the queries with a new length.
441 pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
444 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
445 assert len(fingerprint) == 32
446 if fingerprint != self._node._fingerprint:
447 raise CorruptShareError(peerid, shnum,
448 "pubkey doesn't match fingerprint")
449 self._pubkey = self._deserialize_pubkey(pubkey_s)
450 self._node._populate_pubkey(self._pubkey)
452 verinfo = (seqnum, root_hash, IV, segsize, datalength)
453 if verinfo not in self._valid_versions:
454 # it's a new pair. Verify the signature.
455 valid = self._pubkey.verify(prefix, signature)
457 raise CorruptShareError(peerid, shnum,
458 "signature is invalid")
459 # ok, it's a valid verinfo. Add it to the list of validated
461 self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
462 % (seqnum, base32.b2a(root_hash)[:4],
463 idlib.shortnodeid_b2a(peerid), shnum,
464 k, N, segsize, datalength))
465 self._valid_versions[verinfo] = (prefix, DictOfSets())
467 # and make a note of the other parameters we've just learned
468 # NOTE: Retrieve needs to be refactored to put k,N in the verinfo
469 # along with seqnum/etc, to make sure we don't co-mingle shares
470 # from differently-encoded versions of the same file.
471 if self._required_shares is None:
472 self._required_shares = k
473 self._node._populate_required_shares(k)
474 if self._total_shares is None:
475 self._total_shares = N
476 self._node._populate_total_shares(N)
478 # reject shares that don't match our narrow-minded ideas of what
479 # encoding we're going to use. This addresses the immediate needs of
480 # ticket #312, by turning the data corruption into unavailability. To
481 # get back the availability (i.e. make sure that one weird-encoding
482 # share that happens to come back first doesn't make us ignore the
483 # rest of the shares), we need to implement the refactoring mentioned
485 if k != self._required_shares:
486 raise CorruptShareError(peerid, shnum,
487 "share has k=%d, we want k=%d" %
488 (k, self._required_shares))
490 if N != self._total_shares:
491 raise CorruptShareError(peerid, shnum,
492 "share has N=%d, we want N=%d" %
493 (N, self._total_shares))
495 # we've already seen this pair, and checked the signature so we
496 # know it's a valid candidate. Accumulate the share info, if
497 # there's enough data present. If not, raise NeedMoreDataError,
498 # which will trigger a re-fetch.
499 _ignored = unpack_share(data)
500 self.log(" found enough data to add share contents")
501 self._valid_versions[verinfo][1].add(shnum, (peerid, data))
504 def _query_failed(self, f, peerid):
505 if not self._running:
507 self._queries_outstanding.discard(peerid)
508 self._used_peers.add(peerid)
509 self._last_failure = f
510 self._bad_peerids.add(peerid)
511 self.log("error during query: %s %s" % (f, f.value), level=log.WEIRD)
513 def _check_for_done(self, res):
514 if not self._running:
515 self.log("ODD: _check_for_done but we're not running")
518 versionmap = DictOfSets()
519 for verinfo, (prefix, sharemap) in self._valid_versions.items():
520 # sharemap is a dict that maps shnums to sets of (peerid,data).
521 # len(sharemap) is the number of distinct shares that appear to
523 if len(sharemap) >= self._required_shares:
524 # this one looks retrievable. TODO: our policy of decoding
525 # the first version that we can get is a bit troublesome: in
526 # a small grid with a large expansion factor, a single
527 # out-of-date server can cause us to retrieve an older
528 # version. Fixing this is equivalent to protecting ourselves
529 # against a rollback attack, and the best approach is
530 # probably to say that we won't do _attempt_decode until:
531 # (we've received at least k+EPSILON shares or
532 # we've received at least k shares and ran out of servers)
533 # in that case, identify the verinfos that are decodeable and
534 # attempt the one with the highest (seqnum,R) value. If the
535 # highest seqnum can't be recovered, only then might we fall
536 # back to an older version.
537 d = defer.maybeDeferred(self._attempt_decode, verinfo, sharemap)
539 self._last_failure = f
540 if f.check(CorruptShareError):
541 self.log("saw corrupt share, rescheduling",
543 # _attempt_decode is responsible for removing the bad
544 # share, so we can just try again
545 eventually(self._check_for_done, None)
548 d.addCallbacks(self._done, _problem)
549 # TODO: create an errback-routing mechanism to make sure that
550 # weird coding errors will cause the retrieval to fail rather
551 # than hanging forever. Any otherwise-unhandled exceptions
552 # should follow this path. A simple way to test this is to
553 # raise BadNameError in _validate_share_and_extract_data .
556 # we don't have enough shares yet. Should we send out more queries?
557 if self._queries_outstanding:
558 # there are some running, so just wait for them to come back.
559 # TODO: if our initial guess at k was too low, waiting for these
560 # responses before sending new queries will increase our latency,
561 # so we could speed things up by sending new requests earlier.
562 self.log("ROUTINE: %d queries outstanding" %
563 len(self._queries_outstanding))
566 # no more queries are outstanding. Can we send out more? First,
567 # should we be looking at more peers?
568 self.log("need more peers: N=%s, peerlist=%d peerlist_limit=%d" %
569 (self._total_shares, len(self._peerlist),
570 self._peerlist_limit), level=log.UNUSUAL)
571 if self._total_shares is not None:
572 search_distance = self._total_shares * 2
575 self.log("search_distance=%d" % search_distance, level=log.UNUSUAL)
576 if self._peerlist_limit < search_distance:
577 # we might be able to get some more peers from the list
578 peers = self._node._client.get_permuted_peers("storage",
580 self._peerlist = [p for p in islice(peers, search_distance)]
581 self._peerlist_limit = search_distance
582 self.log("added peers, peerlist=%d, peerlist_limit=%d"
583 % (len(self._peerlist), self._peerlist_limit),
585 # are there any peers on the list that we haven't used?
587 for (peerid, ss) in self._peerlist:
588 if peerid not in self._used_peers:
589 new_query_peers.append( (peerid, ss) )
590 if len(new_query_peers) > 5:
591 # only query in batches of 5. TODO: this is pretty
592 # arbitrary, really I want this to be something like
593 # k - max(known_version_sharecounts) + some extra
596 self.log("sending %d new queries (read %d bytes)" %
597 (len(new_query_peers), self._read_size), level=log.UNUSUAL)
598 for (peerid, ss) in new_query_peers:
599 self._do_query(ss, peerid, self._storage_index, self._read_size)
600 # we'll retrigger when those queries come back
603 # we've used up all the peers we're allowed to search. Failure.
604 self.log("ran out of peers", level=log.WEIRD)
605 e = NotEnoughPeersError("last failure: %s" % self._last_failure)
606 return self._done(failure.Failure(e))
608 def _attempt_decode(self, verinfo, sharemap):
609 # sharemap is a dict which maps shnum to [(peerid,data)..] sets.
610 (seqnum, root_hash, IV, segsize, datalength) = verinfo
612 assert len(sharemap) >= self._required_shares, len(sharemap)
615 for shnum in sorted(sharemap.keys()):
616 for shareinfo in sharemap[shnum]:
617 shares_s.append("#%d" % shnum)
618 shares_s = ",".join(shares_s)
619 self.log("_attempt_decode: version %d-%s, shares: %s" %
620 (seqnum, base32.b2a(root_hash)[:4], shares_s))
622 # first, validate each share that we haven't validated yet. We use
623 # self._valid_shares to remember which ones we've already checked.
626 for shnum, shareinfos in sharemap.items():
627 assert len(shareinfos) > 0
628 for shareinfo in shareinfos:
629 # have we already validated the hashes on this share?
630 if shareinfo not in self._valid_shares:
631 # nope: must check the hashes and extract the actual data
632 (peerid,data) = shareinfo
634 # The (seqnum+root_hash+IV) tuple for this share was
635 # already verified: specifically, all shares in the
636 # sharemap have a (seqnum+root_hash+IV) pair that was
637 # present in a validly signed prefix. The remainder
638 # of the prefix for this particular share has *not*
639 # been validated, but we don't care since we don't
640 # use it. self._validate_share() is required to check
641 # the hashes on the share data (and hash chains) to
642 # make sure they match root_hash, but is not required
643 # (and is in fact prohibited, because we don't
644 # validate the prefix on all shares) from using
645 # anything else in the share.
646 validator = self._validate_share_and_extract_data
647 sharedata = validator(peerid, root_hash, shnum, data)
648 assert isinstance(sharedata, str)
649 except CorruptShareError, e:
650 self.log("share was corrupt: %s" % e, level=log.WEIRD)
651 sharemap[shnum].discard(shareinfo)
652 if not sharemap[shnum]:
653 # remove the key so the test in _check_for_done
654 # can accurately decide that we don't have enough
655 # shares to try again right now.
657 # If there are enough remaining shares,
658 # _check_for_done() will try again
660 # share is valid: remember it so we won't need to check
661 # (or extract) it again
662 self._valid_shares[shareinfo] = sharedata
664 # the share is now in _valid_shares, so just copy over the
666 shares[shnum] = self._valid_shares[shareinfo]
668 # now that the big loop is done, all shares in the sharemap are
669 # valid, and they're all for the same seqnum+root_hash version, so
670 # it's now down to doing FEC and decrypt.
671 assert len(shares) >= self._required_shares, len(shares)
672 d = defer.maybeDeferred(self._decode, shares, segsize, datalength)
673 d.addCallback(self._decrypt, IV, seqnum, root_hash)
676 def _validate_share_and_extract_data(self, peerid, root_hash, shnum, data):
677 # 'data' is the whole SMDF share
678 self.log("_validate_share_and_extract_data[%d]" % shnum)
679 assert data[0] == "\x00"
680 pieces = unpack_share(data)
681 (seqnum, root_hash_copy, IV, k, N, segsize, datalen,
682 pubkey, signature, share_hash_chain, block_hash_tree,
683 share_data, enc_privkey) = pieces
685 assert isinstance(share_data, str)
686 # build the block hash tree. SDMF has only one leaf.
687 leaves = [hashutil.block_hash(share_data)]
688 t = hashtree.HashTree(leaves)
689 if list(t) != block_hash_tree:
690 raise CorruptShareError(peerid, shnum, "block hash tree failure")
691 share_hash_leaf = t[0]
692 t2 = hashtree.IncompleteHashTree(N)
693 # root_hash was checked by the signature
694 t2.set_hashes({0: root_hash})
696 t2.set_hashes(hashes=share_hash_chain,
697 leaves={shnum: share_hash_leaf})
698 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), e:
699 msg = "corrupt hashes: %s" % (e,)
700 raise CorruptShareError(peerid, shnum, msg)
701 self.log(" data valid! len=%d" % len(share_data))
704 def _decode(self, shares_dict, segsize, datalength):
705 # we ought to know these values by now
706 assert self._required_shares is not None
707 assert self._total_shares is not None
709 # shares_dict is a dict mapping shnum to share data, but the codec
711 shareids = []; shares = []
712 for shareid, share in shares_dict.items():
713 shareids.append(shareid)
716 assert len(shareids) >= self._required_shares, len(shareids)
717 # zfec really doesn't want extra shares
718 shareids = shareids[:self._required_shares]
719 shares = shares[:self._required_shares]
721 fec = codec.CRSDecoder()
722 params = "%d-%d-%d" % (segsize,
723 self._required_shares, self._total_shares)
724 fec.set_serialized_params(params)
726 self.log("params %s, we have %d shares" % (params, len(shares)))
727 self.log("about to decode, shareids=%s" % (shareids,))
728 d = defer.maybeDeferred(fec.decode, shares, shareids)
730 self.log(" decode done, %d buffers" % len(buffers))
731 segment = "".join(buffers)
732 self.log(" joined length %d, datalength %d" %
733 (len(segment), datalength))
734 segment = segment[:datalength]
735 self.log(" segment len=%d" % len(segment))
738 self.log(" decode failed: %s" % f)
744 def _decrypt(self, crypttext, IV, seqnum, root_hash):
745 key = hashutil.ssk_readkey_data_hash(IV, self._readkey)
747 plaintext = decryptor.process(crypttext)
748 # it worked, so record the seqnum and root_hash for next time
749 self._node._populate_seqnum(seqnum)
750 self._node._populate_root_hash(root_hash)
753 def _done(self, contents):
755 self._running = False
756 self._status.set_active(False)
757 self._status.set_status("Done")
758 self._status.set_progress(1.0)
759 self._status.set_size(len(contents))
760 eventually(self._done_deferred.callback, contents)
762 def get_status(self):
766 class DictOfSets(dict):
767 def add(self, key, value):
771 self[key] = set([value])
774 implements(IPublishStatus)
775 statusid_counter = count(0)
780 self.storage_index = None
783 self.status = "Not started"
785 self.counter = self.statusid_counter.next()
787 def get_storage_index(self):
788 return self.storage_index
789 def using_helper(self):
793 def get_status(self):
795 def get_progress(self):
797 def get_active(self):
799 def get_counter(self):
802 def set_storage_index(self, si):
803 self.storage_index = si
804 def set_helper(self, helper):
806 def set_size(self, size):
808 def set_status(self, status):
810 def set_progress(self, value):
811 self.progress = value
812 def set_active(self, value):
816 """I represent a single act of publishing the mutable file to the grid."""
818 def __init__(self, filenode):
819 self._node = filenode
820 self._storage_index = self._node.get_storage_index()
821 self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
822 num = self._node._client.log("Publish(%s): starting" % prefix)
823 self._log_number = num
824 self._status = PublishStatus()
825 self._status.set_storage_index(self._storage_index)
826 self._status.set_helper(False)
827 self._status.set_progress(0.0)
828 self._status.set_active(True)
830 def log(self, *args, **kwargs):
831 if 'parent' not in kwargs:
832 kwargs['parent'] = self._log_number
833 num = log.msg(*args, **kwargs)
836 def log_err(self, *args, **kwargs):
837 if 'parent' not in kwargs:
838 kwargs['parent'] = self._log_number
839 num = log.err(*args, **kwargs)
842 def publish(self, newdata):
843 """Publish the filenode's current contents. Returns a Deferred that
844 fires (with None) when the publish has done as much work as it's ever
845 going to do, or errbacks with ConsistencyError if it detects a
849 # 1: generate shares (SDMF: files are small, so we can do it in RAM)
850 # 2: perform peer selection, get candidate servers
851 # 2a: send queries to n+epsilon servers, to determine current shares
852 # 2b: based upon responses, create target map
853 # 3: send slot_testv_and_readv_and_writev messages
854 # 4: as responses return, update share-dispatch table
855 # 4a: may need to run recovery algorithm
856 # 5: when enough responses are back, we're done
858 self.log("starting publish, datalen is %s" % len(newdata))
859 self._started = time.time()
860 self._status.set_size(len(newdata))
862 self._writekey = self._node.get_writekey()
863 assert self._writekey, "need write capability to publish"
865 old_roothash = self._node._current_roothash
866 old_seqnum = self._node._current_seqnum
867 assert old_seqnum is not None, "must read before replace"
868 self._new_seqnum = old_seqnum + 1
870 # read-before-replace also guarantees these fields are available
871 readkey = self._node.get_readkey()
872 required_shares = self._node.get_required_shares()
873 total_shares = self._node.get_total_shares()
874 self._pubkey = self._node.get_pubkey()
876 # these two may not be, we might have to get them from the first peer
877 self._privkey = self._node.get_privkey()
878 self._encprivkey = self._node.get_encprivkey()
882 # we read only 1KB because all we generally care about is the seqnum
883 # ("prefix") info, so we know which shares are where. We need to get
884 # the privkey from somebody, which means reading more like 3KB, but
885 # the code in _obtain_privkey will ensure that we manage that even if
886 # we need an extra roundtrip. TODO: arrange to read 3KB from one peer
887 # who is likely to hold a share, so we can avoid the latency of that
888 # extra roundtrip. 3KB would get us the encprivkey from a dirnode
889 # with up to 7 entries, allowing us to make an update in 2 RTT
891 self._read_size = 1000
893 d = defer.succeed(total_shares)
894 d.addCallback(self._query_peers)
895 d.addCallback(self._obtain_privkey)
897 d.addCallback(self._encrypt_and_encode, newdata, readkey, IV,
898 required_shares, total_shares)
899 d.addCallback(self._generate_shares, self._new_seqnum, IV)
901 d.addCallback(self._send_shares, IV)
902 d.addCallback(self._maybe_recover)
903 d.addCallback(self._done)
906 def _query_peers(self, total_shares):
907 self.log("_query_peers")
909 storage_index = self._storage_index
911 # In 0.7.0, we went through extra work to make sure that we include
912 # ourselves in the peerlist, mainly to match Retrieve (which did the
913 # same thing. With the post-0.7.0 Introducer refactoring, we got rid
914 # of the include-myself flags, and standardized on the
915 # uploading/downloading node not being special.
917 # One nice feature of the old approach was that by putting a share on
918 # the local storage server, we're more likely to be able to retrieve
919 # a copy of the encrypted private key (even if all the old servers
920 # have gone away), so we can regenerate new shares even if we can't
921 # retrieve the old contents. This need will eventually go away when
922 # we switch to DSA-based mutable files (which store the private key
925 peerlist = self._node._client.get_permuted_peers("storage",
928 current_share_peers = DictOfSets()
930 # list of (peerid, shnum, offset, length) where the encprivkey might
932 self._encprivkey_shares = []
934 EPSILON = total_shares / 2
935 #partial_peerlist = islice(peerlist, total_shares + EPSILON)
936 partial_peerlist = peerlist[:total_shares+EPSILON]
938 self._storage_servers = {}
941 for permutedid, (peerid, ss) in enumerate(partial_peerlist):
942 self._storage_servers[peerid] = ss
943 d = self._do_query(ss, peerid, storage_index)
944 d.addCallback(self._got_query_results,
946 reachable_peers, current_share_peers)
948 d = defer.DeferredList(dl)
949 d.addCallback(self._got_all_query_results,
950 total_shares, reachable_peers,
952 # TODO: add an errback to, probably to ignore that peer
954 # TODO: if we can't get a privkey from these servers, consider
955 # looking farther afield. Be aware of the old 0.7.0 behavior that
956 # causes us to create our initial directory before we've connected to
957 # anyone but ourselves.. those old directories may not be
958 # retrieveable if our own server is no longer in the early part of
959 # the permuted peerlist.
962 def _do_query(self, ss, peerid, storage_index):
963 self.log("querying %s" % idlib.shortnodeid_b2a(peerid))
964 d = ss.callRemote("slot_readv",
965 storage_index, [], [(0, self._read_size)])
968 def _got_query_results(self, datavs, peerid, permutedid,
969 reachable_peers, current_share_peers):
971 lp = self.log(format="_got_query_results from %(peerid)s",
972 peerid=idlib.shortnodeid_b2a(peerid))
973 assert isinstance(datavs, dict)
974 reachable_peers[peerid] = permutedid
976 self.log("peer has no shares", parent=lp)
977 for shnum, datav in datavs.items():
978 lp2 = self.log("peer has shnum %d" % shnum, parent=lp)
979 assert len(datav) == 1
981 # We want (seqnum, root_hash, IV) from all servers to know what
982 # versions we are replacing. We want the encprivkey from one
983 # server (assuming it's valid) so we know our own private key, so
984 # we can sign our update. SMDF: read the whole share from each
985 # server. TODO: later we can optimize this to transfer less data.
987 # we assume that we have enough data to extract the signature.
988 # TODO: if this raises NeedMoreDataError, arrange to do another
990 r = unpack_prefix_and_signature(data)
991 (seqnum, root_hash, IV, k, N, segsize, datalen,
992 pubkey_s, signature, prefix) = r
994 # self._pubkey is present because we require read-before-replace
995 valid = self._pubkey.verify(prefix, signature)
997 self.log(format="bad signature from %(peerid)s shnum %(shnum)d",
998 peerid=idlib.shortnodeid_b2a(peerid), shnum=shnum,
999 parent=lp2, level=log.WEIRD)
1001 self.log(format="peer has goodsig shnum %(shnum)d seqnum %(seqnum)d",
1002 shnum=shnum, seqnum=seqnum,
1003 parent=lp2, level=log.NOISY)
1005 share = (shnum, seqnum, root_hash)
1006 current_share_peers.add(shnum, (peerid, seqnum, root_hash) )
1008 if not self._privkey:
1009 self._try_to_extract_privkey(data, peerid, shnum)
1012 def _try_to_extract_privkey(self, data, peerid, shnum):
1014 r = unpack_share(data)
1015 except NeedMoreDataError, e:
1016 # this share won't help us. oh well.
1017 offset = e.encprivkey_offset
1018 length = e.encprivkey_length
1019 self.log("shnum %d on peerid %s: share was too short (%dB) "
1020 "to get the encprivkey; [%d:%d] ought to hold it" %
1021 (shnum, idlib.shortnodeid_b2a(peerid), len(data),
1022 offset, offset+length))
1023 # NOTE: if uncoordinated writes are taking place, someone might
1024 # change the share (and most probably move the encprivkey) before
1025 # we get a chance to do one of these reads and fetch it. This
1026 # will cause us to see a NotEnoughPeersError(unable to fetch
1027 # privkey) instead of an UncoordinatedWriteError . This is a
1028 # nuisance, but it will go away when we move to DSA-based mutable
1029 # files (since the privkey will be small enough to fit in the
1032 self._encprivkey_shares.append( (peerid, shnum, offset, length))
1035 (seqnum, root_hash, IV, k, N, segsize, datalen,
1036 pubkey, signature, share_hash_chain, block_hash_tree,
1037 share_data, enc_privkey) = r
1039 return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
1041 def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
1042 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
1043 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
1044 if alleged_writekey != self._writekey:
1045 self.log("invalid privkey from %s shnum %d" %
1046 (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
1050 self.log("got valid privkey from shnum %d on peerid %s" %
1051 (shnum, idlib.shortnodeid_b2a(peerid)))
1052 self._privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
1053 self._encprivkey = enc_privkey
1054 self._node._populate_encprivkey(self._encprivkey)
1055 self._node._populate_privkey(self._privkey)
1057 def _got_all_query_results(self, res,
1058 total_shares, reachable_peers,
1059 current_share_peers):
1060 self.log("_got_all_query_results")
1061 # now that we know everything about the shares currently out there,
1062 # decide where to place the new shares.
1064 # if an old share X is on a node, put the new share X there too.
1065 # TODO: 1: redistribute shares to achieve one-per-peer, by copying
1066 # shares from existing peers to new (less-crowded) ones. The
1067 # old shares must still be updated.
1068 # TODO: 2: move those shares instead of copying them, to reduce future
1071 # if log.recording_noisy
1073 for shnum in range(total_shares):
1075 for oldplace in current_share_peers.get(shnum, []):
1076 (peerid, seqnum, R) = oldplace
1077 logmsg2.append("%s:#%d:R=%s" % (idlib.shortnodeid_b2a(peerid),
1078 seqnum, base32.b2a(R)[:4]))
1079 logmsg.append("sh%d on (%s)" % (shnum, "/".join(logmsg2)))
1080 self.log("sharemap: %s" % (", ".join(logmsg)), level=log.NOISY)
1081 self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
1084 shares_needing_homes = range(total_shares)
1085 target_map = DictOfSets() # maps shnum to set((peerid,oldseqnum,oldR))
1086 shares_per_peer = DictOfSets()
1087 for shnum in range(total_shares):
1088 for oldplace in current_share_peers.get(shnum, []):
1089 (peerid, seqnum, R) = oldplace
1090 if seqnum >= self._new_seqnum:
1091 self.log("somebody has a newer sequence number than what we were uploading",
1093 self.log(format="peerid=%(peerid)s, theirs=%(seqnum)d, mine=%(new_seqnum)d",
1094 peerid=idlib.shortnodeid_b2a(peerid),
1096 new_seqnum=self._new_seqnum)
1097 raise UncoordinatedWriteError("somebody has a newer sequence number than us")
1098 target_map.add(shnum, oldplace)
1099 shares_per_peer.add(peerid, shnum)
1100 if shnum in shares_needing_homes:
1101 shares_needing_homes.remove(shnum)
1103 # now choose homes for the remaining shares. We prefer peers with the
1104 # fewest target shares, then peers with the lowest permuted index. If
1105 # there are no shares already in place, this will assign them
1106 # one-per-peer in the normal permuted order.
1107 while shares_needing_homes:
1108 if not reachable_peers:
1109 prefix = storage.si_b2a(self._node.get_storage_index())[:5]
1110 raise NotEnoughPeersError("ran out of peers during upload of (%s); shares_needing_homes: %s, reachable_peers: %s" % (prefix, shares_needing_homes, reachable_peers,))
1111 shnum = shares_needing_homes.pop(0)
1112 possible_homes = reachable_peers.keys()
1113 possible_homes.sort(lambda a,b:
1114 cmp( (len(shares_per_peer.get(a, [])),
1115 reachable_peers[a]),
1116 (len(shares_per_peer.get(b, [])),
1117 reachable_peers[b]) ))
1118 target_peerid = possible_homes[0]
1119 target_map.add(shnum, (target_peerid, None, None) )
1120 shares_per_peer.add(target_peerid, shnum)
1122 assert not shares_needing_homes
1124 target_info = (target_map, shares_per_peer)
1127 def _obtain_privkey(self, target_info):
1128 # make sure we've got a copy of our private key.
1130 # Must have picked it up during _query_peers. We're good to go.
1133 # Nope, we haven't managed to grab a copy, and we still need it. Ask
1134 # peers one at a time until we get a copy. Only bother asking peers
1135 # who've admitted to holding a share.
1137 target_map, shares_per_peer = target_info
1138 # pull shares from self._encprivkey_shares
1139 if not self._encprivkey_shares:
1140 raise NotEnoughPeersError("Unable to find a copy of the privkey")
1142 (peerid, shnum, offset, length) = self._encprivkey_shares.pop(0)
1143 ss = self._storage_servers[peerid]
1144 self.log("trying to obtain privkey from %s shnum %d" %
1145 (idlib.shortnodeid_b2a(peerid), shnum))
1146 d = self._do_privkey_query(ss, peerid, shnum, offset, length)
1147 d.addErrback(self.log_err)
1148 d.addCallback(lambda res: self._obtain_privkey(target_info))
1151 def _do_privkey_query(self, rref, peerid, shnum, offset, length):
1152 d = rref.callRemote("slot_readv", self._storage_index,
1153 [shnum], [(offset, length)] )
1154 d.addCallback(self._privkey_query_response, peerid, shnum)
1157 def _privkey_query_response(self, datav, peerid, shnum):
1158 data = datav[shnum][0]
1159 self._try_to_validate_privkey(data, peerid, shnum)
1161 def _encrypt_and_encode(self, target_info,
1162 newdata, readkey, IV,
1163 required_shares, total_shares):
1164 self.log("_encrypt_and_encode")
1166 key = hashutil.ssk_readkey_data_hash(IV, readkey)
1168 crypttext = enc.process(newdata)
1169 assert len(crypttext) == len(newdata)
1172 self.MAX_SEGMENT_SIZE = 1024*1024
1173 data_length = len(crypttext)
1175 segment_size = min(self.MAX_SEGMENT_SIZE, len(crypttext))
1176 # this must be a multiple of self.required_shares
1177 segment_size = mathutil.next_multiple(segment_size, required_shares)
1179 self.num_segments = mathutil.div_ceil(len(crypttext), segment_size)
1181 self.num_segments = 0
1182 assert self.num_segments in [0, 1,] # SDMF restrictions
1183 fec = codec.CRSEncoder()
1184 fec.set_params(segment_size, required_shares, total_shares)
1185 piece_size = fec.get_block_size()
1186 crypttext_pieces = [None] * required_shares
1187 for i in range(len(crypttext_pieces)):
1188 offset = i * piece_size
1189 piece = crypttext[offset:offset+piece_size]
1190 piece = piece + "\x00"*(piece_size - len(piece)) # padding
1191 crypttext_pieces[i] = piece
1192 assert len(piece) == piece_size
1194 d = fec.encode(crypttext_pieces)
1195 d.addCallback(lambda shares_and_shareids:
1196 (shares_and_shareids,
1197 required_shares, total_shares,
1198 segment_size, data_length,
1202 def _generate_shares(self, (shares_and_shareids,
1203 required_shares, total_shares,
1204 segment_size, data_length,
1207 self.log("_generate_shares")
1209 # we should know these by now
1210 privkey = self._privkey
1211 encprivkey = self._encprivkey
1212 pubkey = self._pubkey
1214 (shares, share_ids) = shares_and_shareids
1216 assert len(shares) == len(share_ids)
1217 assert len(shares) == total_shares
1219 block_hash_trees = {}
1220 share_hash_leaves = [None] * len(shares)
1221 for i in range(len(shares)):
1222 share_data = shares[i]
1223 shnum = share_ids[i]
1224 all_shares[shnum] = share_data
1226 # build the block hash tree. SDMF has only one leaf.
1227 leaves = [hashutil.block_hash(share_data)]
1228 t = hashtree.HashTree(leaves)
1229 block_hash_trees[shnum] = block_hash_tree = list(t)
1230 share_hash_leaves[shnum] = t[0]
1231 for leaf in share_hash_leaves:
1232 assert leaf is not None
1233 share_hash_tree = hashtree.HashTree(share_hash_leaves)
1234 share_hash_chain = {}
1235 for shnum in range(total_shares):
1236 needed_hashes = share_hash_tree.needed_hashes(shnum)
1237 share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
1238 for i in needed_hashes ] )
1239 root_hash = share_hash_tree[0]
1240 assert len(root_hash) == 32
1241 self.log("my new root_hash is %s" % base32.b2a(root_hash))
1243 prefix = pack_prefix(seqnum, root_hash, IV,
1244 required_shares, total_shares,
1245 segment_size, data_length)
1247 # now pack the beginning of the share. All shares are the same up
1248 # to the signature, then they have divergent share hash chains,
1249 # then completely different block hash trees + IV + share data,
1250 # then they all share the same encprivkey at the end. The sizes
1251 # of everything are the same for all shares.
1253 signature = privkey.sign(prefix)
1255 verification_key = pubkey.serialize()
1258 for shnum in range(total_shares):
1259 final_share = pack_share(prefix,
1262 share_hash_chain[shnum],
1263 block_hash_trees[shnum],
1266 final_shares[shnum] = final_share
1267 return (seqnum, root_hash, final_shares, target_info)
1270 def _send_shares(self, (seqnum, root_hash, final_shares, target_info), IV):
1271 self.log("_send_shares")
1272 # we're finally ready to send out our shares. If we encounter any
1273 # surprises here, it's because somebody else is writing at the same
1274 # time. (Note: in the future, when we remove the _query_peers() step
1275 # and instead speculate about [or remember] which shares are where,
1276 # surprises here are *not* indications of UncoordinatedWriteError,
1277 # and we'll need to respond to them more gracefully.)
1279 target_map, shares_per_peer = target_info
1281 my_checkstring = pack_checkstring(seqnum, root_hash, IV)
1283 expected_old_shares = {}
1285 for shnum, peers in target_map.items():
1286 for (peerid, old_seqnum, old_root_hash) in peers:
1287 testv = [(0, len(my_checkstring), "le", my_checkstring)]
1288 new_share = final_shares[shnum]
1289 writev = [(0, new_share)]
1290 if peerid not in peer_messages:
1291 peer_messages[peerid] = {}
1292 peer_messages[peerid][shnum] = (testv, writev, None)
1293 if peerid not in expected_old_shares:
1294 expected_old_shares[peerid] = {}
1295 expected_old_shares[peerid][shnum] = (old_seqnum, old_root_hash)
1297 read_vector = [(0, len(my_checkstring))]
1300 # ok, send the messages!
1301 self._surprised = False
1302 dispatch_map = DictOfSets()
1304 for peerid, tw_vectors in peer_messages.items():
1306 write_enabler = self._node.get_write_enabler(peerid)
1307 renew_secret = self._node.get_renewal_secret(peerid)
1308 cancel_secret = self._node.get_cancel_secret(peerid)
1309 secrets = (write_enabler, renew_secret, cancel_secret)
1311 d = self._do_testreadwrite(peerid, secrets,
1312 tw_vectors, read_vector)
1313 d.addCallback(self._got_write_answer, tw_vectors, my_checkstring,
1314 peerid, expected_old_shares[peerid], dispatch_map)
1317 d = defer.DeferredList(dl)
1318 d.addCallback(lambda res: (self._surprised, dispatch_map))
1321 def _do_testreadwrite(self, peerid, secrets,
1322 tw_vectors, read_vector):
1323 storage_index = self._node._uri.storage_index
1324 ss = self._storage_servers[peerid]
1326 d = ss.callRemote("slot_testv_and_readv_and_writev",
1333 def _got_write_answer(self, answer, tw_vectors, my_checkstring,
1334 peerid, expected_old_shares,
1336 lp = self.log("_got_write_answer from %s" %
1337 idlib.shortnodeid_b2a(peerid))
1338 wrote, read_data = answer
1341 (new_seqnum,new_root_hash,new_IV) = unpack_checkstring(my_checkstring)
1344 for shnum in tw_vectors:
1345 dispatch_map.add(shnum, (peerid, new_seqnum, new_root_hash))
1347 # surprise! our testv failed, so the write did not happen
1348 self.log("our testv failed, that write did not happen",
1349 parent=lp, level=log.WEIRD)
1352 for shnum, (old_cs,) in read_data.items():
1353 (old_seqnum, old_root_hash, IV) = unpack_checkstring(old_cs)
1356 dispatch_map.add(shnum, (peerid, old_seqnum, old_root_hash))
1358 if shnum not in expected_old_shares:
1359 # surprise! there was a share we didn't know about
1360 self.log("they had share %d that we didn't know about" % shnum,
1361 parent=lp, level=log.WEIRD)
1364 seqnum, root_hash = expected_old_shares[shnum]
1365 if seqnum is not None:
1366 if seqnum != old_seqnum or root_hash != old_root_hash:
1367 # surprise! somebody modified the share on us
1368 self.log("somebody modified the share on us:"
1369 " shnum=%d: I thought they had #%d:R=%s,"
1370 " but testv reported #%d:R=%s" %
1372 seqnum, base32.b2a(root_hash)[:4],
1373 old_seqnum, base32.b2a(old_root_hash)[:4]),
1374 parent=lp, level=log.WEIRD)
1377 self._surprised = True
1379 def _log_dispatch_map(self, dispatch_map):
1380 for shnum, places in dispatch_map.items():
1381 sent_to = [(idlib.shortnodeid_b2a(peerid),
1383 base32.b2a(root_hash)[:4])
1384 for (peerid,seqnum,root_hash) in places]
1385 self.log(" share %d sent to: %s" % (shnum, sent_to),
1388 def _maybe_recover(self, (surprised, dispatch_map)):
1389 self.log("_maybe_recover, surprised=%s, dispatch_map:" % surprised,
1391 self._log_dispatch_map(dispatch_map)
1393 self.log(" no recovery needed")
1395 self.log("We need recovery!", level=log.WEIRD)
1396 print "RECOVERY NOT YET IMPLEMENTED"
1397 # but dispatch_map will help us do it
1398 raise UncoordinatedWriteError("I was surprised!")
1400 def _done(self, res):
1402 self._status.timings["total"] = now - self._started
1403 self._status.set_active(False)
1404 self._status.set_status("Done")
1405 self._status.set_progress(1.0)
1408 def get_status(self):
1412 # use client.create_mutable_file() to make one of these
1414 class MutableFileNode:
1415 implements(IMutableFileNode)
1416 publish_class = Publish
1417 retrieve_class = Retrieve
1418 SIGNATURE_KEY_SIZE = 2048
1420 def __init__(self, client):
1421 self._client = client
1422 self._pubkey = None # filled in upon first read
1423 self._privkey = None # filled in if we're mutable
1424 self._required_shares = None # ditto
1425 self._total_shares = None # ditto
1426 self._sharemap = {} # known shares, shnum-to-[nodeids]
1428 self._current_data = None # SDMF: we're allowed to cache the contents
1429 self._current_roothash = None # ditto
1430 self._current_seqnum = None # ditto
1433 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', hasattr(self, '_uri') and self._uri.abbrev())
1435 def init_from_uri(self, myuri):
1436 # we have the URI, but we have not yet retrieved the public
1437 # verification key, nor things like 'k' or 'N'. If and when someone
1438 # wants to get our contents, we'll pull from shares and fill those
1440 self._uri = IMutableFileURI(myuri)
1441 if not self._uri.is_readonly():
1442 self._writekey = self._uri.writekey
1443 self._readkey = self._uri.readkey
1444 self._storage_index = self._uri.storage_index
1445 self._fingerprint = self._uri.fingerprint
1446 # the following values are learned during Retrieval
1448 # self._required_shares
1449 # self._total_shares
1450 # and these are needed for Publish. They are filled in by Retrieval
1451 # if possible, otherwise by the first peer that Publish talks to.
1452 self._privkey = None
1453 self._encprivkey = None
1456 def create(self, initial_contents):
1457 """Call this when the filenode is first created. This will generate
1458 the keys, generate the initial shares, wait until at least numpeers
1459 are connected, allocate shares, and upload the initial
1460 contents. Returns a Deferred that fires (with the MutableFileNode
1461 instance you should use) when it completes.
1463 self._required_shares = 3
1464 self._total_shares = 10
1465 d = defer.maybeDeferred(self._generate_pubprivkeys)
1466 def _generated( (pubkey, privkey) ):
1467 self._pubkey, self._privkey = pubkey, privkey
1468 pubkey_s = self._pubkey.serialize()
1469 privkey_s = self._privkey.serialize()
1470 self._writekey = hashutil.ssk_writekey_hash(privkey_s)
1471 self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
1472 self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
1473 self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
1474 self._readkey = self._uri.readkey
1475 self._storage_index = self._uri.storage_index
1476 # TODO: seqnum/roothash: really we mean "doesn't matter since
1477 # nobody knows about us yet"
1478 self._current_seqnum = 0
1479 self._current_roothash = "\x00"*32
1480 return self._publish(initial_contents)
1481 d.addCallback(_generated)
1484 def _generate_pubprivkeys(self):
1485 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2 secs
1486 signer = rsa.generate(self.SIGNATURE_KEY_SIZE)
1487 verifier = signer.get_verifying_key()
1488 return verifier, signer
1490 def _publish(self, initial_contents):
1491 p = self.publish_class(self)
1492 self._client.notify_publish(p)
1493 d = p.publish(initial_contents)
1494 d.addCallback(lambda res: self)
1497 def _encrypt_privkey(self, writekey, privkey):
1499 crypttext = enc.process(privkey)
1502 def _decrypt_privkey(self, enc_privkey):
1503 enc = AES(self._writekey)
1504 privkey = enc.process(enc_privkey)
1507 def _populate(self, stuff):
1508 # the Retrieval object calls this with values it discovers when
1509 # downloading the slot. This is how a MutableFileNode that was
1510 # created from a URI learns about its full key.
1513 def _populate_pubkey(self, pubkey):
1514 self._pubkey = pubkey
1515 def _populate_required_shares(self, required_shares):
1516 self._required_shares = required_shares
1517 def _populate_total_shares(self, total_shares):
1518 self._total_shares = total_shares
1519 def _populate_seqnum(self, seqnum):
1520 self._current_seqnum = seqnum
1521 def _populate_root_hash(self, root_hash):
1522 self._current_roothash = root_hash
1524 def _populate_privkey(self, privkey):
1525 self._privkey = privkey
1526 def _populate_encprivkey(self, encprivkey):
1527 self._encprivkey = encprivkey
1530 def get_write_enabler(self, peerid):
1531 assert len(peerid) == 20
1532 return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
1533 def get_renewal_secret(self, peerid):
1534 assert len(peerid) == 20
1535 crs = self._client.get_renewal_secret()
1536 frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
1537 return hashutil.bucket_renewal_secret_hash(frs, peerid)
1538 def get_cancel_secret(self, peerid):
1539 assert len(peerid) == 20
1540 ccs = self._client.get_cancel_secret()
1541 fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
1542 return hashutil.bucket_cancel_secret_hash(fcs, peerid)
1544 def get_writekey(self):
1545 return self._writekey
1546 def get_readkey(self):
1547 return self._readkey
1548 def get_storage_index(self):
1549 return self._storage_index
1550 def get_privkey(self):
1551 return self._privkey
1552 def get_encprivkey(self):
1553 return self._encprivkey
1554 def get_pubkey(self):
1557 def get_required_shares(self):
1558 return self._required_shares
1559 def get_total_shares(self):
1560 return self._total_shares
1564 return self._uri.to_string()
1566 return "?" # TODO: this is likely to cause problems, not being an int
1567 def get_readonly(self):
1568 if self.is_readonly():
1570 ro = MutableFileNode(self._client)
1571 ro.init_from_uri(self._uri.get_readonly())
1574 def get_readonly_uri(self):
1575 return self._uri.get_readonly().to_string()
1577 def is_mutable(self):
1578 return self._uri.is_mutable()
1579 def is_readonly(self):
1580 return self._uri.is_readonly()
1583 return hash((self.__class__, self.uri))
1584 def __cmp__(self, them):
1585 if cmp(type(self), type(them)):
1586 return cmp(type(self), type(them))
1587 if cmp(self.__class__, them.__class__):
1588 return cmp(self.__class__, them.__class__)
1589 return cmp(self.uri, them.uri)
1591 def get_verifier(self):
1592 return IMutableFileURI(self._uri).get_verifier()
1595 verifier = self.get_verifier()
1596 return self._client.getServiceNamed("checker").check(verifier)
1598 def download(self, target):
1599 # fake it. TODO: make this cleaner.
1600 d = self.download_to_data()
1602 target.open(len(data))
1605 return target.finish()
1606 d.addCallback(_done)
1609 def download_to_data(self):
1610 r = self.retrieve_class(self)
1611 self._client.notify_retrieve(r)
1614 def replace(self, newdata):
1615 r = self.retrieve_class(self)
1616 self._client.notify_retrieve(r)
1618 d.addCallback(lambda res: self._publish(newdata))
1621 class MutableWatcher(service.MultiService):
1622 MAX_PUBLISH_STATUSES = 20
1623 MAX_RETRIEVE_STATUSES = 20
1624 name = "mutable-watcher"
1627 service.MultiService.__init__(self)
1628 self._all_publish = weakref.WeakKeyDictionary()
1629 self._recent_publish_status = []
1630 self._all_retrieve = weakref.WeakKeyDictionary()
1631 self._recent_retrieve_status = []
1633 def notify_publish(self, p):
1634 self._all_publish[p] = None
1635 self._recent_publish_status.append(p.get_status())
1636 while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
1637 self._recent_publish_status.pop(0)
1639 def list_all_publish(self):
1640 return self._all_publish.keys()
1641 def list_active_publish(self):
1642 return [p.get_status() for p in self._all_publish.keys()
1643 if p.get_status().get_active()]
1644 def list_recent_publish(self):
1645 return self._recent_publish_status
1648 def notify_retrieve(self, r):
1649 self._all_retrieve[r] = None
1650 self._recent_retrieve_status.append(r.get_status())
1651 while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
1652 self._recent_retrieve_status.pop(0)
1654 def list_all_retrieve(self):
1655 return self._all_retrieve.keys()
1656 def list_active_retrieve(self):
1657 return [p.get_status() for p in self._all_retrieve.keys()
1658 if p.get_status().get_active()]
1659 def list_recent_retrieve(self):
1660 return self._recent_retrieve_status