3 from itertools import islice
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.eventual import eventually
8 from allmydata.interfaces import IMutableFileNode, IMutableFileURI
9 from allmydata.util import hashutil, mathutil, idlib
10 from allmydata.uri import WriteableSSKFileURI
11 from allmydata.Crypto.Cipher import AES
12 from allmydata import hashtree, codec
13 from allmydata.encode import NotEnoughPeersError
16 class NeedMoreDataError(Exception):
17 def __init__(self, needed_bytes):
18 Exception.__init__(self)
19 self.needed_bytes = needed_bytes
21 class UncoordinatedWriteError(Exception):
24 class CorruptShareError(Exception):
25 def __init__(self, peerid, shnum, reason):
30 short_peerid = idlib.nodeid_b2a(self.peerid)[:8]
31 return "<CorruptShareError peerid=%s shnum[%d]: %s" % (short_peerid,
35 PREFIX = ">BQ32s16s" # each version has a different prefix
36 SIGNED_PREFIX = ">BQ32s16s BBQQ" # this is covered by the signature
37 HEADER = ">BQ32s16s BBQQ LLLLQQ" # includes offsets
38 HEADER_LENGTH = struct.calcsize(HEADER)
40 def unpack_prefix_and_signature(data):
41 assert len(data) >= HEADER_LENGTH
43 prefix = data[:struct.calcsize(SIGNED_PREFIX)]
49 k, N, segsize, datalen,
51 o['share_hash_chain'],
55 o['EOF']) = struct.unpack(HEADER, data[:HEADER_LENGTH])
58 if len(data) < o['share_hash_chain']:
59 raise NeedMoreDataError(o['share_hash_chain'])
61 pubkey_s = data[HEADER_LENGTH:o['signature']]
62 signature = data[o['signature']:o['share_hash_chain']]
64 return (seqnum, root_hash, IV, k, N, segsize, datalen,
65 pubkey_s, signature, prefix)
67 def unpack_share(data):
68 assert len(data) >= HEADER_LENGTH
74 k, N, segsize, datalen,
76 o['share_hash_chain'],
80 o['EOF']) = struct.unpack(HEADER, data[:HEADER_LENGTH])
83 if len(data) < o['EOF']:
84 raise NeedMoreDataError(o['EOF'])
86 pubkey = data[HEADER_LENGTH:o['signature']]
87 signature = data[o['signature']:o['share_hash_chain']]
88 share_hash_chain_s = data[o['share_hash_chain']:o['block_hash_tree']]
89 share_hash_format = ">H32s"
90 hsize = struct.calcsize(share_hash_format)
91 assert len(share_hash_chain_s) % hsize == 0, len(share_hash_chain_s)
93 for i in range(0, len(share_hash_chain_s), hsize):
94 chunk = share_hash_chain_s[i:i+hsize]
95 (hid, h) = struct.unpack(share_hash_format, chunk)
96 share_hash_chain.append( (hid, h) )
97 block_hash_tree_s = data[o['block_hash_tree']:o['share_data']]
98 assert len(block_hash_tree_s) % 32 == 0, len(block_hash_tree_s)
100 for i in range(0, len(block_hash_tree_s), 32):
101 block_hash_tree.append(block_hash_tree_s[i:i+32])
103 share_data = data[o['share_data']:o['enc_privkey']]
104 enc_privkey = data[o['enc_privkey']:o['EOF']]
106 return (seqnum, root_hash, IV, k, N, segsize, datalen,
107 pubkey, signature, share_hash_chain, block_hash_tree,
108 share_data, enc_privkey)
111 def pack_checkstring(seqnum, root_hash, IV):
112 return struct.pack(PREFIX,
118 def unpack_checkstring(checkstring):
119 cs_len = struct.calcsize(PREFIX)
120 version, seqnum, root_hash, IV = struct.unpack(PREFIX, checkstring[:cs_len])
121 assert version == 0 # TODO: just ignore the share
122 return (seqnum, root_hash, IV)
124 def pack_prefix(seqnum, root_hash, IV,
125 required_shares, total_shares,
126 segment_size, data_length):
127 prefix = struct.pack(SIGNED_PREFIX,
140 def pack_offsets(verification_key_length, signature_length,
141 share_hash_chain_length, block_hash_tree_length,
142 share_data_length, encprivkey_length):
143 post_offset = HEADER_LENGTH
145 o1 = offsets['signature'] = post_offset + verification_key_length
146 o2 = offsets['share_hash_chain'] = o1 + signature_length
147 o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length
148 o4 = offsets['share_data'] = o3 + block_hash_tree_length
149 o5 = offsets['enc_privkey'] = o4 + share_data_length
150 o6 = offsets['EOF'] = o5 + encprivkey_length
152 return struct.pack(">LLLLQQ",
153 offsets['signature'],
154 offsets['share_hash_chain'],
155 offsets['block_hash_tree'],
156 offsets['share_data'],
157 offsets['enc_privkey'],
161 def __init__(self, filenode):
162 self._node = filenode
163 self._contents = None
164 # if the filenode already has a copy of the pubkey, use it. Otherwise
165 # we'll grab a copy from the first peer we talk to.
166 self._pubkey = filenode.get_pubkey()
167 self._storage_index = filenode.get_storage_index()
168 self._readkey = filenode.get_readkey()
171 self._node._client.log(msg)
174 """Retrieve the filenode's current contents. Returns a Deferred that
175 fires with a string when the contents have been retrieved."""
177 # 1: make a guess as to how many peers we should send requests to. We
178 # want to hear from k+EPSILON (k because we have to, EPSILON extra
179 # because that helps us resist rollback attacks). [TRADEOFF:
180 # EPSILON>0 means extra work] [TODO: implement EPSILON>0]
181 # 2: build the permuted peerlist, taking the first k+E peers
182 # 3: send readv requests to all of them in parallel, asking for the
183 # first 2KB of data from all shares
184 # 4: when the first of the responses comes back, extract information:
185 # 4a: extract the pubkey, hash it, compare against the URI. If this
186 # check fails, log a WEIRD and ignore the peer.
187 # 4b: extract the prefix (seqnum, roothash, k, N, segsize, datalength)
188 # and verify the signature on it. If this is wrong, log a WEIRD
189 # and ignore the peer. Save the prefix string in a dict that's
190 # keyed by (seqnum,roothash) and has (prefixstring, sharemap) as
191 # values. We'll use the prefixstring again later to avoid doing
192 # multiple signature checks
193 # 4c: extract the share size (offset of the last byte of sharedata).
194 # if it is larger than 2k, send new readv requests to pull down
196 # 4d: if the extracted 'k' is more than we guessed, rebuild a larger
197 # permuted peerlist and send out more readv requests.
198 # 5: as additional responses come back, extract the prefix and compare
199 # against the ones we've already seen. If they match, add the
200 # peerid to the corresponing sharemap dict
201 # 6: [TRADEOFF]: if EPSILON==0, when we get k responses for the
202 # same (seqnum,roothash) key, attempt to reconstruct that data.
203 # if EPSILON>0, wait for k+EPSILON responses, then attempt to
204 # reconstruct the most popular version.. If we do not have enough
205 # shares and there are still requests outstanding, wait. If there
206 # are not still requests outstanding (todo: configurable), send
207 # more requests. Never send queries to more than 2*N servers. If
208 # we've run out of servers, fail.
209 # 7: if we discover corrupt shares during the reconstruction process,
210 # remove that share from the sharemap. and start step#6 again.
212 initial_query_count = 5
213 self._read_size = 2000
215 # we might not know how many shares we need yet.
216 self._required_shares = self._node.get_required_shares()
217 self._total_shares = self._node.get_total_shares()
219 self._datalength = None
221 d = defer.succeed(initial_query_count)
222 d.addCallback(self._choose_initial_peers)
223 d.addCallback(self._send_initial_requests)
224 d.addCallback(lambda res: self._contents)
227 def _choose_initial_peers(self, numqueries):
229 full_peerlist = n._client.get_permuted_peers(self._storage_index,
231 # _peerlist is a list of (peerid,conn) tuples for peers that are
232 # worth talking too. This starts with the first numqueries in the
233 # permuted list. If that's not enough to get us a recoverable
234 # version, we expand this to include the first 2*total_shares peerids
235 # (assuming we learn what total_shares is from one of the first
237 self._peerlist = [(p[1],p[2])
238 for p in islice(full_peerlist, numqueries)]
239 # _peerlist_limit is the query limit we used to build this list. If
240 # we later increase this limit, it may be useful to re-scan the
242 self._peerlist_limit = numqueries
243 return self._peerlist
245 def _send_initial_requests(self, peerlist):
246 self._bad_peerids = set()
248 self._queries_outstanding = set()
249 self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
250 self._peer_storage_servers = {}
252 for (permutedid, peerid, conn) in peerlist:
253 self._queries_outstanding.add(peerid)
254 self._do_query(conn, peerid, self._storage_index, self._read_size,
255 self._peer_storage_servers)
257 # control flow beyond this point: state machine. Receiving responses
258 # from queries is the input. We might send out more queries, or we
259 # might produce a result.
261 d = self._done_deferred = defer.Deferred()
264 def _do_query(self, conn, peerid, storage_index, readsize,
265 peer_storage_servers):
266 self._queries_outstanding.add(peerid)
267 if peerid in peer_storage_servers:
268 d = defer.succeed(peer_storage_servers[peerid])
270 d = conn.callRemote("get_service", "storageserver")
271 def _got_storageserver(ss):
272 peer_storage_servers[peerid] = ss
274 d.addCallback(_got_storageserver)
275 d.addCallback(lambda ss: ss.callRemote("readv_slots", [(0, readsize)]))
276 d.addCallback(self._got_results, peerid, readsize)
277 d.addErrback(self._query_failed, peerid, (conn, storage_index,
278 peer_storage_servers))
281 def _deserialize_pubkey(self, pubkey_s):
285 def _validate_share(self, root_hash, shnum, data):
287 raise CorruptShareError("explanation")
290 def _got_results(self, datavs, peerid, readsize):
291 self._queries_outstanding.discard(peerid)
292 self._used_peers.add(peerid)
293 if not self._running:
296 for shnum,datav in datavs.items():
298 (seqnum, root_hash, IV, k, N, segsize, datalength,
299 pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
302 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
303 if fingerprint != self._node._fingerprint:
305 raise CorruptShareError(peerid,
306 "pubkey doesn't match fingerprint")
307 self._pubkey = self._deserialize_pubkey(pubkey_s)
309 verinfo = (seqnum, root_hash, IV)
310 if verinfo not in self._valid_versions:
311 # it's a new pair. Verify the signature.
312 valid = self._pubkey.verify(prefix, signature)
314 raise CorruptShareError(peerid,
315 "signature is invalid")
316 # ok, it's a valid verinfo. Add it to the list of validated
318 self._valid_versions[verinfo] = (prefix, DictOfSets())
320 # and make a note of the other parameters we've just learned
321 if self._required_shares is None:
322 self._required_shares = k
323 if self._total_shares is None:
324 self._total_shares = N
325 if self._segsize is None:
326 self._segsize = segsize
327 if self._datalength is None:
328 self._datalength = datalength
330 # we've already seen this pair, and checked the signature so we
331 # know it's a valid candidate. Accumulate the share info, if
332 # there's enough data present. If not, raise NeedMoreDataError,
333 # which will trigger a re-fetch.
334 _ignored = unpack_share(data)
335 self._valid_versions[verinfo][1].add(shnum, (peerid, data))
337 self._check_for_done()
340 def _query_failed(self, f, peerid, stuff):
341 self._queries_outstanding.discard(peerid)
342 self._used_peers.add(peerid)
343 if not self._running:
345 if f.check(NeedMoreDataError):
346 # ah, just re-send the query then.
347 self._read_size = max(self._read_size, f.needed_bytes)
348 (conn, storage_index, peer_storage_servers) = stuff
349 self._do_query(conn, peerid, storage_index, self._read_size,
350 peer_storage_servers)
352 self._bad_peerids.add(peerid)
353 short_sid = idlib.a2b(self.storage_index)[:6]
354 if f.check(CorruptShareError):
355 self.log("WEIRD: bad share for %s: %s" % (short_sid, f))
357 self.log("WEIRD: other error for %s: %s" % (short_sid, f))
358 self._check_for_done()
360 def _check_for_done(self):
362 versionmap = DictOfSets()
363 for verinfo, (prefix, sharemap) in self._valid_versions.items():
364 if len(sharemap) >= self._required_shares:
365 # this one looks retrievable
366 d = defer.maybeDeferred(self._extract_data, verinfo, sharemap)
368 if f.check(CorruptShareError):
370 # _extract_data is responsible for removing the bad
371 # share, so we can just try again
372 eventually(self._check_for_done)
375 d.addCallbacks(self._done, _problem)
378 # we don't have enough shares yet. Should we send out more queries?
379 if self._queries_outstanding:
380 # there are some running, so just wait for them to come back.
381 # TODO: if our initial guess at k was too low, waiting for these
382 # responses before sending new queries will increase our latency,
383 # so we could speed things up by sending new requests earlier.
386 # no more queries are outstanding. Can we send out more? First,
387 # should we be looking at more peers?
388 if self._total_shares is not None:
389 search_distance = self._total_shares * 2
392 if self._peerlist_limit < search_distance:
393 # we might be able to get some more peers from the list
394 peers = self._node._client.get_permuted_peers(self._storage_index,
396 self._peerlist = [(p[1],p[2])
397 for p in islice(peers, search_distance)]
398 self._peerlist_limit = search_distance
399 # are there any peers on the list that we haven't used?
401 for (peerid, conn) in self._peerlist:
402 if peerid not in self._used_peers:
403 new_query_peers.append( (peerid, conn) )
404 if len(new_query_peers) > 5:
405 # only query in batches of 5. TODO: this is pretty
406 # arbitrary, really I want this to be something like
407 # k - max(known_version_sharecounts) + some extra
410 for (peerid, conn) in new_query_peers:
411 self._do_query(conn, peerid,
412 self._storage_index, self._read_size,
413 self._peer_storage_servers)
414 # we'll retrigger when those queries come back
417 # we've used up all the peers we're allowed to search. Failure.
418 return self._done(failure.Failure(NotEnoughPeersError()))
420 def _extract_data(self, verinfo, sharemap):
421 # sharemap is a dict which maps shnum to [(peerid,data)..] sets.
422 (seqnum, root_hash, IV) = verinfo
424 # first, validate each share that we haven't validated yet. We use
425 # self._valid_shares to remember which ones we've already checked.
427 self._valid_shares = set() # set of (peerid,data) sets
429 for shnum, shareinfo in sharemap.items():
430 if shareinfo not in self._valid_shares:
431 (peerid,data) = shareinfo
433 # The (seqnum+root_hash+IV) tuple for this share was
434 # already verified: specifically, all shares in the
435 # sharemap have a (seqnum+root_hash+IV) pair that was
436 # present in a validly signed prefix. The remainder of
437 # the prefix for this particular share has *not* been
438 # validated, but we don't care since we don't use it.
439 # self._validate_share() is required to check the hashes
440 # on the share data (and hash chains) to make sure they
441 # match root_hash, but is not required (and is in fact
442 # prohibited, because we don't validate the prefix on all
443 # shares) from using anything else in the share.
444 sharedata = self._validate_share(root_hash, shnum, data)
445 except CorruptShareError, e:
446 self.log("WEIRD: share was corrupt: %s" % e)
447 sharemap[shnum].discard(shareinfo)
448 # If there are enough remaining shares, _check_for_done()
451 self._valid_shares.add(shareinfo)
452 shares[shnum] = sharedata
453 # at this point, all shares in the sharemap are valid, and they're
454 # all for the same seqnum+root_hash version, so it's now down to
455 # doing FEC and decrypt.
456 d = defer.maybeDeferred(self._decode, shares)
457 d.addCallback(self._decrypt, IV)
460 def _decode(self, shares_dict):
461 # shares_dict is a dict mapping shnum to share data, but the codec
463 shareids = []; shares = []
464 for shareid, share in shares_dict.items():
465 shareids.append(shareid)
468 fec = codec.CRSDecoder()
469 # we ought to know these values by now
470 assert self._segsize is not None
471 assert self._required_shares is not None
472 assert self._total_shares is not None
473 params = "%d-%d-%d" % (self._segsize,
474 self._required_shares, self._total_shares)
475 fec.set_serialized_params(params)
477 d = fec.decode(shares, shareids)
479 segment = "".join(buffers)
480 segment = segment[:self._datalength]
485 def _decrypt(self, crypttext, IV):
486 key = hashutil.ssk_readkey_data_hash(IV, self._readkey)
487 decryptor = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
488 plaintext = decryptor.decrypt(crypttext)
491 def _done(self, contents):
492 self._running = False
493 eventually(self._done_deferred.callback, contents)
497 class DictOfSets(dict):
498 def add(self, key, value):
502 self[key] = set([value])
505 """I represent a single act of publishing the mutable file to the grid."""
507 def __init__(self, filenode):
508 self._node = filenode
510 def publish(self, newdata):
511 """Publish the filenode's current contents. Returns a Deferred that
512 fires (with None) when the publish has done as much work as it's ever
513 going to do, or errbacks with ConsistencyError if it detects a
514 simultaneous write."""
516 # 1: generate shares (SDMF: files are small, so we can do it in RAM)
517 # 2: perform peer selection, get candidate servers
518 # 2a: send queries to n+epsilon servers, to determine current shares
519 # 2b: based upon responses, create target map
520 # 3: send slot_testv_and_readv_and_writev messages
521 # 4: as responses return, update share-dispatch table
522 # 4a: may need to run recovery algorithm
523 # 5: when enough responses are back, we're done
525 old_roothash = self._node._current_roothash
526 old_seqnum = self._node._current_seqnum
528 readkey = self._node.get_readkey()
529 required_shares = self._node.get_required_shares()
530 total_shares = self._node.get_total_shares()
531 privkey = self._node.get_privkey()
532 encprivkey = self._node.get_encprivkey()
533 pubkey = self._node.get_pubkey()
537 d = defer.succeed(newdata)
538 d.addCallback(self._encrypt_and_encode, readkey, IV,
539 required_shares, total_shares)
540 d.addCallback(self._generate_shares, old_seqnum+1,
541 privkey, encprivkey, pubkey)
543 d.addCallback(self._query_peers, total_shares)
544 d.addCallback(self._send_shares, IV)
545 d.addCallback(self._maybe_recover)
546 d.addCallback(lambda res: None)
549 def _encrypt_and_encode(self, newdata, readkey, IV,
550 required_shares, total_shares):
551 key = hashutil.ssk_readkey_data_hash(IV, readkey)
552 enc = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
553 crypttext = enc.encrypt(newdata)
556 self.MAX_SEGMENT_SIZE = 1024*1024
557 segment_size = min(self.MAX_SEGMENT_SIZE, len(crypttext))
558 # this must be a multiple of self.required_shares
559 segment_size = mathutil.next_multiple(segment_size,
561 self.num_segments = mathutil.div_ceil(len(crypttext), segment_size)
562 assert self.num_segments == 1 # SDMF restrictions
563 fec = codec.CRSEncoder()
564 fec.set_params(segment_size, required_shares, total_shares)
565 piece_size = fec.get_block_size()
566 crypttext_pieces = []
567 for offset in range(0, len(crypttext), piece_size):
568 piece = crypttext[offset:offset+piece_size]
569 if len(piece) < piece_size:
570 pad_size = piece_size - len(piece)
571 piece = piece + "\x00"*pad_size
572 crypttext_pieces.append(piece)
573 assert len(piece) == piece_size
575 d = fec.encode(crypttext_pieces)
576 d.addCallback(lambda shares:
577 (shares, required_shares, total_shares,
578 segment_size, len(crypttext), IV) )
581 def _generate_shares(self, (shares_and_shareids,
582 required_shares, total_shares,
583 segment_size, data_length, IV),
584 seqnum, privkey, encprivkey, pubkey):
586 (shares, share_ids) = shares_and_shareids
588 assert len(shares) == len(share_ids)
589 assert len(shares) == total_shares
591 block_hash_trees = {}
592 share_hash_leaves = [None] * len(shares)
593 for i in range(len(shares)):
594 share_data = shares[i]
596 all_shares[shnum] = share_data
598 # build the block hash tree. SDMF has only one leaf.
599 leaves = [hashutil.block_hash(share_data)]
600 t = hashtree.HashTree(leaves)
601 block_hash_trees[shnum] = block_hash_tree = list(t)
602 share_hash_leaves[shnum] = t[0]
603 for leaf in share_hash_leaves:
604 assert leaf is not None
605 share_hash_tree = hashtree.HashTree(share_hash_leaves)
606 share_hash_chain = {}
607 for shnum in range(total_shares):
608 needed_hashes = share_hash_tree.needed_hashes(shnum)
609 share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
610 for i in needed_hashes ] )
611 root_hash = share_hash_tree[0]
612 assert len(root_hash) == 32
614 prefix = pack_prefix(seqnum, root_hash, IV,
615 required_shares, total_shares,
616 segment_size, data_length)
618 # now pack the beginning of the share. All shares are the same up
619 # to the signature, then they have divergent share hash chains,
620 # then completely different block hash trees + IV + share data,
621 # then they all share the same encprivkey at the end. The sizes
622 # of everything are the same for all shares.
624 signature = privkey.sign(prefix)
626 verification_key = pubkey.serialize()
629 for shnum in range(total_shares):
630 shc = share_hash_chain[shnum]
631 share_hash_chain_s = "".join([struct.pack(">H32s", i, shc[i])
632 for i in sorted(shc.keys())])
633 bht = block_hash_trees[shnum]
636 block_hash_tree_s = "".join(bht)
637 share_data = all_shares[shnum]
638 offsets = pack_offsets(len(verification_key),
640 len(share_hash_chain_s),
641 len(block_hash_tree_s),
645 final_shares[shnum] = "".join([prefix,
653 return (seqnum, root_hash, final_shares)
656 def _query_peers(self, (seqnum, root_hash, final_shares), total_shares):
657 self._new_seqnum = seqnum
658 self._new_root_hash = root_hash
659 self._new_shares = final_shares
661 storage_index = self._node.get_storage_index()
662 peerlist = self._node._client.get_permuted_peers(storage_index,
663 include_myself=False)
664 # we don't include ourselves in the N peers, but we *do* push an
665 # extra copy of share[0] to ourselves so we're more likely to have
666 # the signing key around later. This way, even if all the servers die
667 # and the directory contents are unrecoverable, at least we can still
668 # push out a new copy with brand-new contents.
669 # TODO: actually push this copy
671 current_share_peers = DictOfSets()
674 EPSILON = total_shares / 2
675 partial_peerlist = islice(peerlist, total_shares + EPSILON)
676 peer_storage_servers = {}
678 for (permutedid, peerid, conn) in partial_peerlist:
679 d = self._do_query(conn, peerid, peer_storage_servers)
680 d.addCallback(self._got_query_results,
682 reachable_peers, current_share_peers)
684 d = defer.DeferredList(dl)
685 d.addCallback(self._got_all_query_results,
686 total_shares, reachable_peers, seqnum,
687 current_share_peers, peer_storage_servers)
688 # TODO: add an errback to, probably to ignore that peer
691 def _do_query(self, conn, peerid, peer_storage_servers):
692 d = conn.callRemote("get_service", "storageserver")
693 def _got_storageserver(ss):
694 peer_storage_servers[peerid] = ss
695 return ss.callRemote("readv_slots", [(0, 2000)])
696 d.addCallback(_got_storageserver)
699 def _got_query_results(self, datavs, peerid, permutedid,
700 reachable_peers, current_share_peers):
701 assert isinstance(datavs, dict)
702 reachable_peers[peerid] = permutedid
703 for shnum, datav in datavs.items():
704 assert len(datav) == 1
706 r = unpack_share(data)
707 share = (shnum, r[0], r[1]) # shnum,seqnum,R
708 current_share_peers[shnum].add( (peerid, r[0], r[1]) )
710 def _got_all_query_results(self, res,
711 total_shares, reachable_peers, new_seqnum,
712 current_share_peers, peer_storage_servers):
713 # now that we know everything about the shares currently out there,
714 # decide where to place the new shares.
716 # if an old share X is on a node, put the new share X there too.
717 # TODO: 1: redistribute shares to achieve one-per-peer, by copying
718 # shares from existing peers to new (less-crowded) ones. The
719 # old shares must still be updated.
720 # TODO: 2: move those shares instead of copying them, to reduce future
723 shares_needing_homes = range(total_shares)
724 target_map = DictOfSets() # maps shnum to set((peerid,oldseqnum,oldR))
725 shares_per_peer = DictOfSets()
726 for shnum in range(total_shares):
727 for oldplace in current_share_peers.get(shnum, []):
728 (peerid, seqnum, R) = oldplace
729 if seqnum >= new_seqnum:
730 raise UncoordinatedWriteError()
731 target_map.add(shnum, oldplace)
732 shares_per_peer.add(peerid, shnum)
733 if shnum in shares_needing_homes:
734 shares_needing_homes.remove(shnum)
736 # now choose homes for the remaining shares. We prefer peers with the
737 # fewest target shares, then peers with the lowest permuted index. If
738 # there are no shares already in place, this will assign them
739 # one-per-peer in the normal permuted order.
740 while shares_needing_homes:
741 if not reachable_peers:
742 raise NotEnoughPeersError("ran out of peers during upload")
743 shnum = shares_needing_homes.pop(0)
744 possible_homes = reachable_peers.keys()
745 possible_homes.sort(lambda a,b:
746 cmp( (len(shares_per_peer.get(a, [])),
748 (len(shares_per_peer.get(b, [])),
749 reachable_peers[b]) ))
750 target_peerid = possible_homes[0]
751 target_map.add(shnum, (target_peerid, None, None) )
752 shares_per_peer.add(target_peerid, shnum)
754 assert not shares_needing_homes
756 return (target_map, peer_storage_servers)
758 def _send_shares(self, (target_map, peer_storage_servers), IV ):
759 # we're finally ready to send out our shares. If we encounter any
760 # surprises here, it's because somebody else is writing at the same
761 # time. (Note: in the future, when we remove the _query_peers() step
762 # and instead speculate about [or remember] which shares are where,
763 # surprises here are *not* indications of UncoordinatedWriteError,
764 # and we'll need to respond to them more gracefully.
766 my_checkstring = pack_checkstring(self._new_seqnum,
767 self._new_root_hash, IV)
769 expected_old_shares = {}
771 for shnum, peers in target_map.items():
772 for (peerid, old_seqnum, old_root_hash) in peers:
773 testv = [(0, len(my_checkstring), "ge", my_checkstring)]
774 new_share = self._new_shares[shnum]
775 writev = [(0, new_share)]
776 if peerid not in peer_messages:
777 peer_messages[peerid] = {}
778 peer_messages[peerid][shnum] = (testv, writev, None)
779 if peerid not in expected_old_shares:
780 expected_old_shares[peerid] = {}
781 expected_old_shares[peerid][shnum] = (old_seqnum, old_root_hash)
783 read_vector = [(0, len(my_checkstring))]
786 # ok, send the messages!
787 self._surprised = False
788 dispatch_map = DictOfSets()
790 for peerid, tw_vectors in peer_messages.items():
792 write_enabler = self._node.get_write_enabler(peerid)
793 renew_secret = self._node.get_renewal_secret(peerid)
794 cancel_secret = self._node.get_cancel_secret(peerid)
795 secrets = (write_enabler, renew_secret, cancel_secret)
797 d = self._do_testreadwrite(peerid, peer_storage_servers, secrets,
798 tw_vectors, read_vector)
799 d.addCallback(self._got_write_answer, tw_vectors, my_checkstring,
800 peerid, expected_old_shares[peerid], dispatch_map)
803 d = defer.DeferredList(dl)
804 d.addCallback(lambda res: (self._surprised, dispatch_map))
807 def _do_testreadwrite(self, peerid, peer_storage_servers, secrets,
808 tw_vectors, read_vector):
809 conn = peer_storage_servers[peerid]
810 storage_index = self._node._uri.storage_index
812 d = conn.callRemote("slot_testv_and_readv_and_writev",
819 def _got_write_answer(self, answer, tw_vectors, my_checkstring,
820 peerid, expected_old_shares,
822 wrote, read_data = answer
826 # surprise! our testv failed, so the write did not happen
829 for shnum, (old_cs,) in read_data.items():
830 (old_seqnum, old_root_hash, IV) = unpack_checkstring(old_cs)
831 if wrote and shnum in tw_vectors:
832 cur_cs = my_checkstring
836 (cur_seqnum, cur_root_hash, IV) = unpack_checkstring(cur_cs)
837 dispatch_map.add(shnum, (peerid, cur_seqnum, cur_root_hash))
839 if shnum not in expected_old_shares:
840 # surprise! there was a share we didn't know about
843 seqnum, root_hash = expected_old_shares[shnum]
844 if seqnum is not None:
845 if seqnum != old_seqnum or root_hash != old_root_hash:
846 # surprise! somebody modified the share on us
849 self._surprised = True
851 def _maybe_recover(self, (surprised, dispatch_map)):
854 print "RECOVERY NOT YET IMPLEMENTED"
855 # but dispatch_map will help us do it
856 raise UncoordinatedWriteError("I was surprised!")
859 # use client.create_mutable_file() to make one of these
861 class MutableFileNode:
862 implements(IMutableFileNode)
863 publish_class = Publish
864 retrieve_class = Retrieve
866 def __init__(self, client):
867 self._client = client
868 self._pubkey = None # filled in upon first read
869 self._privkey = None # filled in if we're mutable
870 self._required_shares = None # ditto
871 self._total_shares = None # ditto
872 self._sharemap = {} # known shares, shnum-to-[nodeids]
874 self._current_data = None # SDMF: we're allowed to cache the contents
875 self._current_roothash = None # ditto
876 self._current_seqnum = None # ditto
878 def init_from_uri(self, myuri):
879 # we have the URI, but we have not yet retrieved the public
880 # verification key, nor things like 'k' or 'N'. If and when someone
881 # wants to get our contents, we'll pull from shares and fill those
883 self._uri = IMutableFileURI(myuri)
884 self._writekey = self._uri.writekey
885 self._readkey = self._uri.readkey
886 self._storage_index = self._uri.storage_index
889 def create(self, initial_contents):
890 """Call this when the filenode is first created. This will generate
891 the keys, generate the initial shares, allocate shares, and upload
892 the initial contents. Returns a Deferred that fires (with the
893 MutableFileNode instance you should use) when it completes.
895 self._required_shares = 3
896 self._total_shares = 10
897 d = defer.maybeDeferred(self._generate_pubprivkeys)
898 def _generated( (pubkey, privkey) ):
899 self._pubkey, self._privkey = pubkey, privkey
900 pubkey_s = self._pubkey.serialize()
901 privkey_s = self._privkey.serialize()
902 self._writekey = hashutil.ssk_writekey_hash(privkey_s)
903 self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
904 self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
905 self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
906 self._readkey = self._uri.readkey
907 self._storage_index = self._uri.storage_index
908 # TODO: seqnum/roothash: really we mean "doesn't matter since
909 # nobody knows about us yet"
910 self._current_seqnum = 0
911 self._current_roothash = "\x00"*32
912 return self._publish(initial_contents)
913 d.addCallback(_generated)
916 def _generate_pubprivkeys(self):
917 # TODO: wire these up to pycryptopp
918 privkey = "very private"
920 return pubkey, privkey
922 def _publish(self, initial_contents):
923 p = self.publish_class(self)
924 d = p.publish(initial_contents)
925 d.addCallback(lambda res: self)
928 def _encrypt_privkey(self, writekey, privkey):
929 enc = AES.new(key=writekey, mode=AES.MODE_CTR, counterstart="\x00"*16)
930 crypttext = enc.encrypt(privkey)
933 def get_write_enabler(self, peerid):
934 assert len(peerid) == 20
935 return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
936 def get_renewal_secret(self, peerid):
937 assert len(peerid) == 20
938 crs = self._client.get_renewal_secret()
939 frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
940 return hashutil.bucket_renewal_secret_hash(frs, peerid)
941 def get_cancel_secret(self, peerid):
942 assert len(peerid) == 20
943 ccs = self._client.get_cancel_secret()
944 fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
945 return hashutil.bucket_cancel_secret_hash(fcs, peerid)
947 def get_writekey(self):
948 return self._writekey
949 def get_readkey(self):
951 def get_storage_index(self):
952 return self._storage_index
953 def get_privkey(self):
955 def get_encprivkey(self):
956 return self._encprivkey
957 def get_pubkey(self):
960 def get_required_shares(self):
961 return self._required_shares
962 def get_total_shares(self):
963 return self._total_shares
967 return self._uri.to_string()
969 def is_mutable(self):
970 return self._uri.is_mutable()
973 return hash((self.__class__, self.uri))
974 def __cmp__(self, them):
975 if cmp(type(self), type(them)):
976 return cmp(type(self), type(them))
977 if cmp(self.__class__, them.__class__):
978 return cmp(self.__class__, them.__class__)
979 return cmp(self.uri, them.uri)
981 def get_verifier(self):
982 return IMutableFileURI(self._uri).get_verifier()
985 verifier = self.get_verifier()
986 return self._client.getServiceNamed("checker").check(verifier)
988 def download(self, target):
989 #downloader = self._client.getServiceNamed("downloader")
990 #return downloader.download(self.uri, target)
991 raise NotImplementedError
993 def download_to_data(self):
994 #downloader = self._client.getServiceNamed("downloader")
995 #return downloader.download_to_data(self.uri)
996 return defer.succeed("this isn't going to fool you, is it")
998 def replace(self, newdata):
999 return defer.succeed(None)