]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable.py
stabilize on 20-byte nodeids everywhere, printed with foolscap's base32
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable.py
1
2 import os, struct
3 from itertools import islice
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.eventual import eventually
8 from allmydata.interfaces import IMutableFileNode, IMutableFileURI
9 from allmydata.util import hashutil, mathutil, idlib
10 from allmydata.uri import WriteableSSKFileURI
11 from allmydata.Crypto.Cipher import AES
12 from allmydata import hashtree, codec
13 from allmydata.encode import NotEnoughPeersError
14
15
16 class NeedMoreDataError(Exception):
17     def __init__(self, needed_bytes):
18         Exception.__init__(self)
19         self.needed_bytes = needed_bytes
20
21 class UncoordinatedWriteError(Exception):
22     pass
23
24 class CorruptShareError(Exception):
25     def __init__(self, peerid, shnum, reason):
26         self.peerid = peerid
27         self.shnum = shnum
28         self.reason = reason
29     def __repr__(self):
30         short_peerid = idlib.nodeid_b2a(self.peerid)[:8]
31         return "<CorruptShareError peerid=%s shnum[%d]: %s" % (short_peerid,
32                                                                self.shnum,
33                                                                self.reason)
34
35 PREFIX = ">BQ32s16s" # each version has a different prefix
36 SIGNED_PREFIX = ">BQ32s16s BBQQ" # this is covered by the signature
37 HEADER = ">BQ32s16s BBQQ LLLLQQ" # includes offsets
38 HEADER_LENGTH = struct.calcsize(HEADER)
39
40 def unpack_prefix_and_signature(data):
41     assert len(data) >= HEADER_LENGTH
42     o = {}
43     prefix = data[:struct.calcsize(SIGNED_PREFIX)]
44
45     (version,
46      seqnum,
47      root_hash,
48      IV,
49      k, N, segsize, datalen,
50      o['signature'],
51      o['share_hash_chain'],
52      o['block_hash_tree'],
53      o['share_data'],
54      o['enc_privkey'],
55      o['EOF']) = struct.unpack(HEADER, data[:HEADER_LENGTH])
56
57     assert version == 0
58     if len(data) < o['share_hash_chain']:
59         raise NeedMoreDataError(o['share_hash_chain'])
60
61     pubkey_s = data[HEADER_LENGTH:o['signature']]
62     signature = data[o['signature']:o['share_hash_chain']]
63
64     return (seqnum, root_hash, IV, k, N, segsize, datalen,
65             pubkey_s, signature, prefix)
66
67 def unpack_share(data):
68     assert len(data) >= HEADER_LENGTH
69     o = {}
70     (version,
71      seqnum,
72      root_hash,
73      IV,
74      k, N, segsize, datalen,
75      o['signature'],
76      o['share_hash_chain'],
77      o['block_hash_tree'],
78      o['share_data'],
79      o['enc_privkey'],
80      o['EOF']) = struct.unpack(HEADER, data[:HEADER_LENGTH])
81
82     assert version == 0
83     if len(data) < o['EOF']:
84         raise NeedMoreDataError(o['EOF'])
85
86     pubkey = data[HEADER_LENGTH:o['signature']]
87     signature = data[o['signature']:o['share_hash_chain']]
88     share_hash_chain_s = data[o['share_hash_chain']:o['block_hash_tree']]
89     share_hash_format = ">H32s"
90     hsize = struct.calcsize(share_hash_format)
91     assert len(share_hash_chain_s) % hsize == 0, len(share_hash_chain_s)
92     share_hash_chain = []
93     for i in range(0, len(share_hash_chain_s), hsize):
94         chunk = share_hash_chain_s[i:i+hsize]
95         (hid, h) = struct.unpack(share_hash_format, chunk)
96         share_hash_chain.append( (hid, h) )
97     block_hash_tree_s = data[o['block_hash_tree']:o['share_data']]
98     assert len(block_hash_tree_s) % 32 == 0, len(block_hash_tree_s)
99     block_hash_tree = []
100     for i in range(0, len(block_hash_tree_s), 32):
101         block_hash_tree.append(block_hash_tree_s[i:i+32])
102
103     share_data = data[o['share_data']:o['enc_privkey']]
104     enc_privkey = data[o['enc_privkey']:o['EOF']]
105
106     return (seqnum, root_hash, IV, k, N, segsize, datalen,
107             pubkey, signature, share_hash_chain, block_hash_tree,
108             share_data, enc_privkey)
109
110
111 def pack_checkstring(seqnum, root_hash, IV):
112     return struct.pack(PREFIX,
113                        0, # version,
114                        seqnum,
115                        root_hash,
116                        IV)
117
118 def unpack_checkstring(checkstring):
119     cs_len = struct.calcsize(PREFIX)
120     version, seqnum, root_hash, IV = struct.unpack(PREFIX, checkstring[:cs_len])
121     assert version == 0 # TODO: just ignore the share
122     return (seqnum, root_hash, IV)
123
124 def pack_prefix(seqnum, root_hash, IV,
125                 required_shares, total_shares,
126                 segment_size, data_length):
127     prefix = struct.pack(SIGNED_PREFIX,
128                          0, # version,
129                          seqnum,
130                          root_hash,
131                          IV,
132
133                          required_shares,
134                          total_shares,
135                          segment_size,
136                          data_length,
137                          )
138     return prefix
139
140 def pack_offsets(verification_key_length, signature_length,
141                  share_hash_chain_length, block_hash_tree_length,
142                  share_data_length, encprivkey_length):
143     post_offset = HEADER_LENGTH
144     offsets = {}
145     o1 = offsets['signature'] = post_offset + verification_key_length
146     o2 = offsets['share_hash_chain'] = o1 + signature_length
147     o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length
148     o4 = offsets['share_data'] = o3 + block_hash_tree_length
149     o5 = offsets['enc_privkey'] = o4 + share_data_length
150     o6 = offsets['EOF'] = o5 + encprivkey_length
151
152     return struct.pack(">LLLLQQ",
153                        offsets['signature'],
154                        offsets['share_hash_chain'],
155                        offsets['block_hash_tree'],
156                        offsets['share_data'],
157                        offsets['enc_privkey'],
158                        offsets['EOF'])
159
160 class Retrieve:
161     def __init__(self, filenode):
162         self._node = filenode
163         self._contents = None
164         # if the filenode already has a copy of the pubkey, use it. Otherwise
165         # we'll grab a copy from the first peer we talk to.
166         self._pubkey = filenode.get_pubkey()
167         self._storage_index = filenode.get_storage_index()
168         self._readkey = filenode.get_readkey()
169
170     def log(self, msg):
171         self._node._client.log(msg)
172
173     def retrieve(self):
174         """Retrieve the filenode's current contents. Returns a Deferred that
175         fires with a string when the contents have been retrieved."""
176
177         # 1: make a guess as to how many peers we should send requests to. We
178         #    want to hear from k+EPSILON (k because we have to, EPSILON extra
179         #    because that helps us resist rollback attacks). [TRADEOFF:
180         #    EPSILON>0 means extra work] [TODO: implement EPSILON>0]
181         # 2: build the permuted peerlist, taking the first k+E peers
182         # 3: send readv requests to all of them in parallel, asking for the
183         #    first 2KB of data from all shares
184         # 4: when the first of the responses comes back, extract information:
185         # 4a: extract the pubkey, hash it, compare against the URI. If this
186         #     check fails, log a WEIRD and ignore the peer.
187         # 4b: extract the prefix (seqnum, roothash, k, N, segsize, datalength)
188         #     and verify the signature on it. If this is wrong, log a WEIRD
189         #     and ignore the peer. Save the prefix string in a dict that's
190         #     keyed by (seqnum,roothash) and has (prefixstring, sharemap) as
191         #     values. We'll use the prefixstring again later to avoid doing
192         #     multiple signature checks
193         # 4c: extract the share size (offset of the last byte of sharedata).
194         #     if it is larger than 2k, send new readv requests to pull down
195         #     the extra data
196         # 4d: if the extracted 'k' is more than we guessed, rebuild a larger
197         #     permuted peerlist and send out more readv requests.
198         # 5: as additional responses come back, extract the prefix and compare
199         #    against the ones we've already seen. If they match, add the
200         #    peerid to the corresponing sharemap dict
201         # 6: [TRADEOFF]: if EPSILON==0, when we get k responses for the
202         #    same (seqnum,roothash) key, attempt to reconstruct that data.
203         #    if EPSILON>0, wait for k+EPSILON responses, then attempt to
204         #    reconstruct the most popular version.. If we do not have enough
205         #    shares and there are still requests outstanding, wait. If there
206         #    are not still requests outstanding (todo: configurable), send
207         #    more requests. Never send queries to more than 2*N servers. If
208         #    we've run out of servers, fail.
209         # 7: if we discover corrupt shares during the reconstruction process,
210         #    remove that share from the sharemap.  and start step#6 again.
211
212         initial_query_count = 5
213         self._read_size = 2000
214
215         # we might not know how many shares we need yet.
216         self._required_shares = self._node.get_required_shares()
217         self._total_shares = self._node.get_total_shares()
218         self._segsize = None
219         self._datalength = None
220
221         d = defer.succeed(initial_query_count)
222         d.addCallback(self._choose_initial_peers)
223         d.addCallback(self._send_initial_requests)
224         d.addCallback(lambda res: self._contents)
225         return d
226
227     def _choose_initial_peers(self, numqueries):
228         n = self._node
229         full_peerlist = n._client.get_permuted_peers(self._storage_index,
230                                                      include_myself=True)
231         # _peerlist is a list of (peerid,conn) tuples for peers that are
232         # worth talking too. This starts with the first numqueries in the
233         # permuted list. If that's not enough to get us a recoverable
234         # version, we expand this to include the first 2*total_shares peerids
235         # (assuming we learn what total_shares is from one of the first
236         # numqueries peers)
237         self._peerlist = [(p[1],p[2])
238                           for p in islice(full_peerlist, numqueries)]
239         # _peerlist_limit is the query limit we used to build this list. If
240         # we later increase this limit, it may be useful to re-scan the
241         # permuted list.
242         self._peerlist_limit = numqueries
243         return self._peerlist
244
245     def _send_initial_requests(self, peerlist):
246         self._bad_peerids = set()
247         self._running = True
248         self._queries_outstanding = set()
249         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
250         self._peer_storage_servers = {}
251         dl = []
252         for (permutedid, peerid, conn) in peerlist:
253             self._queries_outstanding.add(peerid)
254             self._do_query(conn, peerid, self._storage_index, self._read_size,
255                            self._peer_storage_servers)
256
257         # control flow beyond this point: state machine. Receiving responses
258         # from queries is the input. We might send out more queries, or we
259         # might produce a result.
260
261         d = self._done_deferred = defer.Deferred()
262         return d
263
264     def _do_query(self, conn, peerid, storage_index, readsize,
265                   peer_storage_servers):
266         self._queries_outstanding.add(peerid)
267         if peerid in peer_storage_servers:
268             d = defer.succeed(peer_storage_servers[peerid])
269         else:
270             d = conn.callRemote("get_service", "storageserver")
271             def _got_storageserver(ss):
272                 peer_storage_servers[peerid] = ss
273                 return ss
274             d.addCallback(_got_storageserver)
275         d.addCallback(lambda ss: ss.callRemote("readv_slots", [(0, readsize)]))
276         d.addCallback(self._got_results, peerid, readsize)
277         d.addErrback(self._query_failed, peerid, (conn, storage_index,
278                                                   peer_storage_servers))
279         return d
280
281     def _deserialize_pubkey(self, pubkey_s):
282         # TODO
283         return None
284
285     def _validate_share(self, root_hash, shnum, data):
286         if False:
287             raise CorruptShareError("explanation")
288         pass
289
290     def _got_results(self, datavs, peerid, readsize):
291         self._queries_outstanding.discard(peerid)
292         self._used_peers.add(peerid)
293         if not self._running:
294             return
295
296         for shnum,datav in datavs.items():
297             data = datav[0]
298             (seqnum, root_hash, IV, k, N, segsize, datalength,
299              pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
300
301             if not self._pubkey:
302                 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
303                 if fingerprint != self._node._fingerprint:
304                     # bad share
305                     raise CorruptShareError(peerid,
306                                             "pubkey doesn't match fingerprint")
307                 self._pubkey = self._deserialize_pubkey(pubkey_s)
308
309             verinfo = (seqnum, root_hash, IV)
310             if verinfo not in self._valid_versions:
311                 # it's a new pair. Verify the signature.
312                 valid = self._pubkey.verify(prefix, signature)
313                 if not valid:
314                     raise CorruptShareError(peerid,
315                                             "signature is invalid")
316                 # ok, it's a valid verinfo. Add it to the list of validated
317                 # versions.
318                 self._valid_versions[verinfo] = (prefix, DictOfSets())
319
320                 # and make a note of the other parameters we've just learned
321                 if self._required_shares is None:
322                     self._required_shares = k
323                 if self._total_shares is None:
324                     self._total_shares = N
325                 if self._segsize is None:
326                     self._segsize = segsize
327                 if self._datalength is None:
328                     self._datalength = datalength
329
330             # we've already seen this pair, and checked the signature so we
331             # know it's a valid candidate. Accumulate the share info, if
332             # there's enough data present. If not, raise NeedMoreDataError,
333             # which will trigger a re-fetch.
334             _ignored = unpack_share(data)
335             self._valid_versions[verinfo][1].add(shnum, (peerid, data))
336
337         self._check_for_done()
338
339
340     def _query_failed(self, f, peerid, stuff):
341         self._queries_outstanding.discard(peerid)
342         self._used_peers.add(peerid)
343         if not self._running:
344             return
345         if f.check(NeedMoreDataError):
346             # ah, just re-send the query then.
347             self._read_size = max(self._read_size, f.needed_bytes)
348             (conn, storage_index, peer_storage_servers) = stuff
349             self._do_query(conn, peerid, storage_index, self._read_size,
350                            peer_storage_servers)
351             return
352         self._bad_peerids.add(peerid)
353         short_sid = idlib.a2b(self.storage_index)[:6]
354         if f.check(CorruptShareError):
355             self.log("WEIRD: bad share for %s: %s" % (short_sid, f))
356         else:
357             self.log("WEIRD: other error for %s: %s" % (short_sid, f))
358         self._check_for_done()
359
360     def _check_for_done(self):
361         share_prefixes = {}
362         versionmap = DictOfSets()
363         for verinfo, (prefix, sharemap) in self._valid_versions.items():
364             if len(sharemap) >= self._required_shares:
365                 # this one looks retrievable
366                 d = defer.maybeDeferred(self._extract_data, verinfo, sharemap)
367                 def _problem(f):
368                     if f.check(CorruptShareError):
369                         # log(WEIRD)
370                         # _extract_data is responsible for removing the bad
371                         # share, so we can just try again
372                         eventually(self._check_for_done)
373                         return
374                     return f
375                 d.addCallbacks(self._done, _problem)
376                 return
377
378         # we don't have enough shares yet. Should we send out more queries?
379         if self._queries_outstanding:
380             # there are some running, so just wait for them to come back.
381             # TODO: if our initial guess at k was too low, waiting for these
382             # responses before sending new queries will increase our latency,
383             # so we could speed things up by sending new requests earlier.
384             return
385
386         # no more queries are outstanding. Can we send out more? First,
387         # should we be looking at more peers?
388         if self._total_shares is not None:
389             search_distance = self._total_shares * 2
390         else:
391             search_distance = 20
392         if self._peerlist_limit < search_distance:
393             # we might be able to get some more peers from the list
394             peers = self._node._client.get_permuted_peers(self._storage_index,
395                                                           include_myself=True)
396             self._peerlist = [(p[1],p[2])
397                               for p in islice(peers, search_distance)]
398             self._peerlist_limit = search_distance
399         # are there any peers on the list that we haven't used?
400         new_query_peers = []
401         for (peerid, conn) in self._peerlist:
402             if peerid not in self._used_peers:
403                 new_query_peers.append( (peerid, conn) )
404                 if len(new_query_peers) > 5:
405                     # only query in batches of 5. TODO: this is pretty
406                     # arbitrary, really I want this to be something like
407                     # k - max(known_version_sharecounts) + some extra
408                     break
409         if new_query_peers:
410             for (peerid, conn) in new_query_peers:
411                 self._do_query(conn, peerid,
412                                self._storage_index, self._read_size,
413                                self._peer_storage_servers)
414             # we'll retrigger when those queries come back
415             return
416
417         # we've used up all the peers we're allowed to search. Failure.
418         return self._done(failure.Failure(NotEnoughPeersError()))
419
420     def _extract_data(self, verinfo, sharemap):
421         # sharemap is a dict which maps shnum to [(peerid,data)..] sets.
422         (seqnum, root_hash, IV) = verinfo
423
424         # first, validate each share that we haven't validated yet. We use
425         # self._valid_shares to remember which ones we've already checked.
426
427         self._valid_shares = set()  # set of (peerid,data) sets
428         shares = {}
429         for shnum, shareinfo in sharemap.items():
430             if shareinfo not in self._valid_shares:
431                 (peerid,data) = shareinfo
432                 try:
433                     # The (seqnum+root_hash+IV) tuple for this share was
434                     # already verified: specifically, all shares in the
435                     # sharemap have a (seqnum+root_hash+IV) pair that was
436                     # present in a validly signed prefix. The remainder of
437                     # the prefix for this particular share has *not* been
438                     # validated, but we don't care since we don't use it.
439                     # self._validate_share() is required to check the hashes
440                     # on the share data (and hash chains) to make sure they
441                     # match root_hash, but is not required (and is in fact
442                     # prohibited, because we don't validate the prefix on all
443                     # shares) from using anything else in the share.
444                     sharedata = self._validate_share(root_hash, shnum, data)
445                 except CorruptShareError, e:
446                     self.log("WEIRD: share was corrupt: %s" % e)
447                     sharemap[shnum].discard(shareinfo)
448                     # If there are enough remaining shares, _check_for_done()
449                     # will try again
450                     raise
451                 self._valid_shares.add(shareinfo)
452                 shares[shnum] = sharedata
453         # at this point, all shares in the sharemap are valid, and they're
454         # all for the same seqnum+root_hash version, so it's now down to
455         # doing FEC and decrypt.
456         d = defer.maybeDeferred(self._decode, shares)
457         d.addCallback(self._decrypt, IV)
458         return d
459
460     def _decode(self, shares_dict):
461         # shares_dict is a dict mapping shnum to share data, but the codec
462         # wants two lists.
463         shareids = []; shares = []
464         for shareid, share in shares_dict.items():
465             shareids.append(shareid)
466             shares.append(share)
467
468         fec = codec.CRSDecoder()
469         # we ought to know these values by now
470         assert self._segsize is not None
471         assert self._required_shares is not None
472         assert self._total_shares is not None
473         params = "%d-%d-%d" % (self._segsize,
474                                self._required_shares, self._total_shares)
475         fec.set_serialized_params(params)
476
477         d = fec.decode(shares, shareids)
478         def _done(buffers):
479             segment = "".join(buffers)
480             segment = segment[:self._datalength]
481             return segment
482         d.addCallback(_done)
483         return d
484
485     def _decrypt(self, crypttext, IV):
486         key = hashutil.ssk_readkey_data_hash(IV, self._readkey)
487         decryptor = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
488         plaintext = decryptor.decrypt(crypttext)
489         return plaintext
490
491     def _done(self, contents):
492         self._running = False
493         eventually(self._done_deferred.callback, contents)
494
495
496
497 class DictOfSets(dict):
498     def add(self, key, value):
499         if key in self:
500             self[key].add(value)
501         else:
502             self[key] = set([value])
503
504 class Publish:
505     """I represent a single act of publishing the mutable file to the grid."""
506
507     def __init__(self, filenode):
508         self._node = filenode
509
510     def publish(self, newdata):
511         """Publish the filenode's current contents. Returns a Deferred that
512         fires (with None) when the publish has done as much work as it's ever
513         going to do, or errbacks with ConsistencyError if it detects a
514         simultaneous write."""
515
516         # 1: generate shares (SDMF: files are small, so we can do it in RAM)
517         # 2: perform peer selection, get candidate servers
518         #  2a: send queries to n+epsilon servers, to determine current shares
519         #  2b: based upon responses, create target map
520         # 3: send slot_testv_and_readv_and_writev messages
521         # 4: as responses return, update share-dispatch table
522         # 4a: may need to run recovery algorithm
523         # 5: when enough responses are back, we're done
524
525         old_roothash = self._node._current_roothash
526         old_seqnum = self._node._current_seqnum
527
528         readkey = self._node.get_readkey()
529         required_shares = self._node.get_required_shares()
530         total_shares = self._node.get_total_shares()
531         privkey = self._node.get_privkey()
532         encprivkey = self._node.get_encprivkey()
533         pubkey = self._node.get_pubkey()
534
535         IV = os.urandom(16)
536
537         d = defer.succeed(newdata)
538         d.addCallback(self._encrypt_and_encode, readkey, IV,
539                       required_shares, total_shares)
540         d.addCallback(self._generate_shares, old_seqnum+1,
541                       privkey, encprivkey, pubkey)
542
543         d.addCallback(self._query_peers, total_shares)
544         d.addCallback(self._send_shares, IV)
545         d.addCallback(self._maybe_recover)
546         d.addCallback(lambda res: None)
547         return d
548
549     def _encrypt_and_encode(self, newdata, readkey, IV,
550                             required_shares, total_shares):
551         key = hashutil.ssk_readkey_data_hash(IV, readkey)
552         enc = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
553         crypttext = enc.encrypt(newdata)
554
555         # now apply FEC
556         self.MAX_SEGMENT_SIZE = 1024*1024
557         segment_size = min(self.MAX_SEGMENT_SIZE, len(crypttext))
558         # this must be a multiple of self.required_shares
559         segment_size = mathutil.next_multiple(segment_size,
560                                                    required_shares)
561         self.num_segments = mathutil.div_ceil(len(crypttext), segment_size)
562         assert self.num_segments == 1 # SDMF restrictions
563         fec = codec.CRSEncoder()
564         fec.set_params(segment_size, required_shares, total_shares)
565         piece_size = fec.get_block_size()
566         crypttext_pieces = []
567         for offset in range(0, len(crypttext), piece_size):
568             piece = crypttext[offset:offset+piece_size]
569             if len(piece) < piece_size:
570                 pad_size = piece_size - len(piece)
571                 piece = piece + "\x00"*pad_size
572             crypttext_pieces.append(piece)
573             assert len(piece) == piece_size
574
575         d = fec.encode(crypttext_pieces)
576         d.addCallback(lambda shares:
577                       (shares, required_shares, total_shares,
578                        segment_size, len(crypttext), IV) )
579         return d
580
581     def _generate_shares(self, (shares_and_shareids,
582                                 required_shares, total_shares,
583                                 segment_size, data_length, IV),
584                          seqnum, privkey, encprivkey, pubkey):
585
586         (shares, share_ids) = shares_and_shareids
587
588         assert len(shares) == len(share_ids)
589         assert len(shares) == total_shares
590         all_shares = {}
591         block_hash_trees = {}
592         share_hash_leaves = [None] * len(shares)
593         for i in range(len(shares)):
594             share_data = shares[i]
595             shnum = share_ids[i]
596             all_shares[shnum] = share_data
597
598             # build the block hash tree. SDMF has only one leaf.
599             leaves = [hashutil.block_hash(share_data)]
600             t = hashtree.HashTree(leaves)
601             block_hash_trees[shnum] = block_hash_tree = list(t)
602             share_hash_leaves[shnum] = t[0]
603         for leaf in share_hash_leaves:
604             assert leaf is not None
605         share_hash_tree = hashtree.HashTree(share_hash_leaves)
606         share_hash_chain = {}
607         for shnum in range(total_shares):
608             needed_hashes = share_hash_tree.needed_hashes(shnum)
609             share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
610                                               for i in needed_hashes ] )
611         root_hash = share_hash_tree[0]
612         assert len(root_hash) == 32
613
614         prefix = pack_prefix(seqnum, root_hash, IV,
615                              required_shares, total_shares,
616                              segment_size, data_length)
617
618         # now pack the beginning of the share. All shares are the same up
619         # to the signature, then they have divergent share hash chains,
620         # then completely different block hash trees + IV + share data,
621         # then they all share the same encprivkey at the end. The sizes
622         # of everything are the same for all shares.
623
624         signature = privkey.sign(prefix)
625
626         verification_key = pubkey.serialize()
627
628         final_shares = {}
629         for shnum in range(total_shares):
630             shc = share_hash_chain[shnum]
631             share_hash_chain_s = "".join([struct.pack(">H32s", i, shc[i])
632                                           for i in sorted(shc.keys())])
633             bht = block_hash_trees[shnum]
634             for h in bht:
635                 assert len(h) == 32
636             block_hash_tree_s = "".join(bht)
637             share_data = all_shares[shnum]
638             offsets = pack_offsets(len(verification_key),
639                                    len(signature),
640                                    len(share_hash_chain_s),
641                                    len(block_hash_tree_s),
642                                    len(share_data),
643                                    len(encprivkey))
644
645             final_shares[shnum] = "".join([prefix,
646                                            offsets,
647                                            verification_key,
648                                            signature,
649                                            share_hash_chain_s,
650                                            block_hash_tree_s,
651                                            share_data,
652                                            encprivkey])
653         return (seqnum, root_hash, final_shares)
654
655
656     def _query_peers(self, (seqnum, root_hash, final_shares), total_shares):
657         self._new_seqnum = seqnum
658         self._new_root_hash = root_hash
659         self._new_shares = final_shares
660
661         storage_index = self._node.get_storage_index()
662         peerlist = self._node._client.get_permuted_peers(storage_index,
663                                                          include_myself=False)
664         # we don't include ourselves in the N peers, but we *do* push an
665         # extra copy of share[0] to ourselves so we're more likely to have
666         # the signing key around later. This way, even if all the servers die
667         # and the directory contents are unrecoverable, at least we can still
668         # push out a new copy with brand-new contents.
669         # TODO: actually push this copy
670
671         current_share_peers = DictOfSets()
672         reachable_peers = {}
673
674         EPSILON = total_shares / 2
675         partial_peerlist = islice(peerlist, total_shares + EPSILON)
676         peer_storage_servers = {}
677         dl = []
678         for (permutedid, peerid, conn) in partial_peerlist:
679             d = self._do_query(conn, peerid, peer_storage_servers)
680             d.addCallback(self._got_query_results,
681                           peerid, permutedid,
682                           reachable_peers, current_share_peers)
683             dl.append(d)
684         d = defer.DeferredList(dl)
685         d.addCallback(self._got_all_query_results,
686                       total_shares, reachable_peers, seqnum,
687                       current_share_peers, peer_storage_servers)
688         # TODO: add an errback to, probably to ignore that peer
689         return d
690
691     def _do_query(self, conn, peerid, peer_storage_servers):
692         d = conn.callRemote("get_service", "storageserver")
693         def _got_storageserver(ss):
694             peer_storage_servers[peerid] = ss
695             return ss.callRemote("readv_slots", [(0, 2000)])
696         d.addCallback(_got_storageserver)
697         return d
698
699     def _got_query_results(self, datavs, peerid, permutedid,
700                            reachable_peers, current_share_peers):
701         assert isinstance(datavs, dict)
702         reachable_peers[peerid] = permutedid
703         for shnum, datav in datavs.items():
704             assert len(datav) == 1
705             data = datav[0]
706             r = unpack_share(data)
707             share = (shnum, r[0], r[1]) # shnum,seqnum,R
708             current_share_peers[shnum].add( (peerid, r[0], r[1]) )
709
710     def _got_all_query_results(self, res,
711                                total_shares, reachable_peers, new_seqnum,
712                                current_share_peers, peer_storage_servers):
713         # now that we know everything about the shares currently out there,
714         # decide where to place the new shares.
715
716         # if an old share X is on a node, put the new share X there too.
717         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
718         #       shares from existing peers to new (less-crowded) ones. The
719         #       old shares must still be updated.
720         # TODO: 2: move those shares instead of copying them, to reduce future
721         #       update work
722
723         shares_needing_homes = range(total_shares)
724         target_map = DictOfSets() # maps shnum to set((peerid,oldseqnum,oldR))
725         shares_per_peer = DictOfSets()
726         for shnum in range(total_shares):
727             for oldplace in current_share_peers.get(shnum, []):
728                 (peerid, seqnum, R) = oldplace
729                 if seqnum >= new_seqnum:
730                     raise UncoordinatedWriteError()
731                 target_map.add(shnum, oldplace)
732                 shares_per_peer.add(peerid, shnum)
733                 if shnum in shares_needing_homes:
734                     shares_needing_homes.remove(shnum)
735
736         # now choose homes for the remaining shares. We prefer peers with the
737         # fewest target shares, then peers with the lowest permuted index. If
738         # there are no shares already in place, this will assign them
739         # one-per-peer in the normal permuted order.
740         while shares_needing_homes:
741             if not reachable_peers:
742                 raise NotEnoughPeersError("ran out of peers during upload")
743             shnum = shares_needing_homes.pop(0)
744             possible_homes = reachable_peers.keys()
745             possible_homes.sort(lambda a,b:
746                                 cmp( (len(shares_per_peer.get(a, [])),
747                                       reachable_peers[a]),
748                                      (len(shares_per_peer.get(b, [])),
749                                       reachable_peers[b]) ))
750             target_peerid = possible_homes[0]
751             target_map.add(shnum, (target_peerid, None, None) )
752             shares_per_peer.add(target_peerid, shnum)
753
754         assert not shares_needing_homes
755
756         return (target_map, peer_storage_servers)
757
758     def _send_shares(self, (target_map, peer_storage_servers), IV ):
759         # we're finally ready to send out our shares. If we encounter any
760         # surprises here, it's because somebody else is writing at the same
761         # time. (Note: in the future, when we remove the _query_peers() step
762         # and instead speculate about [or remember] which shares are where,
763         # surprises here are *not* indications of UncoordinatedWriteError,
764         # and we'll need to respond to them more gracefully.
765
766         my_checkstring = pack_checkstring(self._new_seqnum,
767                                           self._new_root_hash, IV)
768         peer_messages = {}
769         expected_old_shares = {}
770
771         for shnum, peers in target_map.items():
772             for (peerid, old_seqnum, old_root_hash) in peers:
773                 testv = [(0, len(my_checkstring), "ge", my_checkstring)]
774                 new_share = self._new_shares[shnum]
775                 writev = [(0, new_share)]
776                 if peerid not in peer_messages:
777                     peer_messages[peerid] = {}
778                 peer_messages[peerid][shnum] = (testv, writev, None)
779                 if peerid not in expected_old_shares:
780                     expected_old_shares[peerid] = {}
781                 expected_old_shares[peerid][shnum] = (old_seqnum, old_root_hash)
782
783         read_vector = [(0, len(my_checkstring))]
784
785         dl = []
786         # ok, send the messages!
787         self._surprised = False
788         dispatch_map = DictOfSets()
789
790         for peerid, tw_vectors in peer_messages.items():
791
792             write_enabler = self._node.get_write_enabler(peerid)
793             renew_secret = self._node.get_renewal_secret(peerid)
794             cancel_secret = self._node.get_cancel_secret(peerid)
795             secrets = (write_enabler, renew_secret, cancel_secret)
796
797             d = self._do_testreadwrite(peerid, peer_storage_servers, secrets,
798                                        tw_vectors, read_vector)
799             d.addCallback(self._got_write_answer, tw_vectors, my_checkstring,
800                           peerid, expected_old_shares[peerid], dispatch_map)
801             dl.append(d)
802
803         d = defer.DeferredList(dl)
804         d.addCallback(lambda res: (self._surprised, dispatch_map))
805         return d
806
807     def _do_testreadwrite(self, peerid, peer_storage_servers, secrets,
808                           tw_vectors, read_vector):
809         conn = peer_storage_servers[peerid]
810         storage_index = self._node._uri.storage_index
811
812         d = conn.callRemote("slot_testv_and_readv_and_writev",
813                             storage_index,
814                             secrets,
815                             tw_vectors,
816                             read_vector)
817         return d
818
819     def _got_write_answer(self, answer, tw_vectors, my_checkstring,
820                           peerid, expected_old_shares,
821                           dispatch_map):
822         wrote, read_data = answer
823         surprised = False
824
825         if not wrote:
826             # surprise! our testv failed, so the write did not happen
827             surprised = True
828
829         for shnum, (old_cs,) in read_data.items():
830             (old_seqnum, old_root_hash, IV) = unpack_checkstring(old_cs)
831             if wrote and shnum in tw_vectors:
832                 cur_cs = my_checkstring
833             else:
834                 cur_cs = old_cs
835
836             (cur_seqnum, cur_root_hash, IV) = unpack_checkstring(cur_cs)
837             dispatch_map.add(shnum, (peerid, cur_seqnum, cur_root_hash))
838
839             if shnum not in expected_old_shares:
840                 # surprise! there was a share we didn't know about
841                 surprised = True
842             else:
843                 seqnum, root_hash = expected_old_shares[shnum]
844                 if seqnum is not None:
845                     if seqnum != old_seqnum or root_hash != old_root_hash:
846                         # surprise! somebody modified the share on us
847                         surprised = True
848         if surprised:
849             self._surprised = True
850
851     def _maybe_recover(self, (surprised, dispatch_map)):
852         if not surprised:
853             return
854         print "RECOVERY NOT YET IMPLEMENTED"
855         # but dispatch_map will help us do it
856         raise UncoordinatedWriteError("I was surprised!")
857
858
859 # use client.create_mutable_file() to make one of these
860
861 class MutableFileNode:
862     implements(IMutableFileNode)
863     publish_class = Publish
864     retrieve_class = Retrieve
865
866     def __init__(self, client):
867         self._client = client
868         self._pubkey = None # filled in upon first read
869         self._privkey = None # filled in if we're mutable
870         self._required_shares = None # ditto
871         self._total_shares = None # ditto
872         self._sharemap = {} # known shares, shnum-to-[nodeids]
873
874         self._current_data = None # SDMF: we're allowed to cache the contents
875         self._current_roothash = None # ditto
876         self._current_seqnum = None # ditto
877
878     def init_from_uri(self, myuri):
879         # we have the URI, but we have not yet retrieved the public
880         # verification key, nor things like 'k' or 'N'. If and when someone
881         # wants to get our contents, we'll pull from shares and fill those
882         # in.
883         self._uri = IMutableFileURI(myuri)
884         self._writekey = self._uri.writekey
885         self._readkey = self._uri.readkey
886         self._storage_index = self._uri.storage_index
887         return self
888
889     def create(self, initial_contents):
890         """Call this when the filenode is first created. This will generate
891         the keys, generate the initial shares, allocate shares, and upload
892         the initial contents. Returns a Deferred that fires (with the
893         MutableFileNode instance you should use) when it completes.
894         """
895         self._required_shares = 3
896         self._total_shares = 10
897         d = defer.maybeDeferred(self._generate_pubprivkeys)
898         def _generated( (pubkey, privkey) ):
899             self._pubkey, self._privkey = pubkey, privkey
900             pubkey_s = self._pubkey.serialize()
901             privkey_s = self._privkey.serialize()
902             self._writekey = hashutil.ssk_writekey_hash(privkey_s)
903             self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
904             self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
905             self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
906             self._readkey = self._uri.readkey
907             self._storage_index = self._uri.storage_index
908             # TODO: seqnum/roothash: really we mean "doesn't matter since
909             # nobody knows about us yet"
910             self._current_seqnum = 0
911             self._current_roothash = "\x00"*32
912             return self._publish(initial_contents)
913         d.addCallback(_generated)
914         return d
915
916     def _generate_pubprivkeys(self):
917         # TODO: wire these up to pycryptopp
918         privkey = "very private"
919         pubkey = "public"
920         return pubkey, privkey
921
922     def _publish(self, initial_contents):
923         p = self.publish_class(self)
924         d = p.publish(initial_contents)
925         d.addCallback(lambda res: self)
926         return d
927
928     def _encrypt_privkey(self, writekey, privkey):
929         enc = AES.new(key=writekey, mode=AES.MODE_CTR, counterstart="\x00"*16)
930         crypttext = enc.encrypt(privkey)
931         return crypttext
932
933     def get_write_enabler(self, peerid):
934         assert len(peerid) == 20
935         return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
936     def get_renewal_secret(self, peerid):
937         assert len(peerid) == 20
938         crs = self._client.get_renewal_secret()
939         frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
940         return hashutil.bucket_renewal_secret_hash(frs, peerid)
941     def get_cancel_secret(self, peerid):
942         assert len(peerid) == 20
943         ccs = self._client.get_cancel_secret()
944         fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
945         return hashutil.bucket_cancel_secret_hash(fcs, peerid)
946
947     def get_writekey(self):
948         return self._writekey
949     def get_readkey(self):
950         return self._readkey
951     def get_storage_index(self):
952         return self._storage_index
953     def get_privkey(self):
954         return self._privkey
955     def get_encprivkey(self):
956         return self._encprivkey
957     def get_pubkey(self):
958         return self._pubkey
959
960     def get_required_shares(self):
961         return self._required_shares
962     def get_total_shares(self):
963         return self._total_shares
964
965
966     def get_uri(self):
967         return self._uri.to_string()
968
969     def is_mutable(self):
970         return self._uri.is_mutable()
971
972     def __hash__(self):
973         return hash((self.__class__, self.uri))
974     def __cmp__(self, them):
975         if cmp(type(self), type(them)):
976             return cmp(type(self), type(them))
977         if cmp(self.__class__, them.__class__):
978             return cmp(self.__class__, them.__class__)
979         return cmp(self.uri, them.uri)
980
981     def get_verifier(self):
982         return IMutableFileURI(self._uri).get_verifier()
983
984     def check(self):
985         verifier = self.get_verifier()
986         return self._client.getServiceNamed("checker").check(verifier)
987
988     def download(self, target):
989         #downloader = self._client.getServiceNamed("downloader")
990         #return downloader.download(self.uri, target)
991         raise NotImplementedError
992
993     def download_to_data(self):
994         #downloader = self._client.getServiceNamed("downloader")
995         #return downloader.download_to_data(self.uri)
996         return defer.succeed("this isn't going to fool you, is it")
997
998     def replace(self, newdata):
999         return defer.succeed(None)