]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable.py
webish: add primitive publish/retrieve status pages
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable.py
1
2 import os, struct, time, weakref
3 from itertools import islice, count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.application import service
8 from foolscap.eventual import eventually
9 from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
10      IPublishStatus, IRetrieveStatus
11 from allmydata.util import base32, hashutil, mathutil, idlib, log
12 from allmydata.uri import WriteableSSKFileURI
13 from allmydata import hashtree, codec, storage
14 from allmydata.encode import NotEnoughPeersError
15 from pycryptopp.publickey import rsa
16 from pycryptopp.cipher.aes import AES
17
18
19 class NotMutableError(Exception):
20     pass
21
22 class NeedMoreDataError(Exception):
23     def __init__(self, needed_bytes, encprivkey_offset, encprivkey_length):
24         Exception.__init__(self)
25         self.needed_bytes = needed_bytes # up through EOF
26         self.encprivkey_offset = encprivkey_offset
27         self.encprivkey_length = encprivkey_length
28     def __str__(self):
29         return "<NeedMoreDataError (%d bytes)>" % self.needed_bytes
30
31 class UncoordinatedWriteError(Exception):
32     def __repr__(self):
33         return "<%s -- You, oh user, tried to change a file or directory at the same time as another process was trying to change it.  To avoid data loss, don't do this.  Please see docs/write_coordination.html for details.>" % (self.__class__.__name__,)
34
35 class CorruptShareError(Exception):
36     def __init__(self, peerid, shnum, reason):
37         self.args = (peerid, shnum, reason)
38         self.peerid = peerid
39         self.shnum = shnum
40         self.reason = reason
41     def __str__(self):
42         short_peerid = idlib.nodeid_b2a(self.peerid)[:8]
43         return "<CorruptShareError peerid=%s shnum[%d]: %s" % (short_peerid,
44                                                                self.shnum,
45                                                                self.reason)
46
47 PREFIX = ">BQ32s16s" # each version has a different prefix
48 SIGNED_PREFIX = ">BQ32s16s BBQQ" # this is covered by the signature
49 HEADER = ">BQ32s16s BBQQ LLLLQQ" # includes offsets
50 HEADER_LENGTH = struct.calcsize(HEADER)
51
52 def unpack_prefix_and_signature(data):
53     assert len(data) >= HEADER_LENGTH
54     o = {}
55     prefix = data[:struct.calcsize(SIGNED_PREFIX)]
56
57     (version,
58      seqnum,
59      root_hash,
60      IV,
61      k, N, segsize, datalen,
62      o['signature'],
63      o['share_hash_chain'],
64      o['block_hash_tree'],
65      o['share_data'],
66      o['enc_privkey'],
67      o['EOF']) = struct.unpack(HEADER, data[:HEADER_LENGTH])
68
69     assert version == 0
70     if len(data) < o['share_hash_chain']:
71         raise NeedMoreDataError(o['share_hash_chain'],
72                                 o['enc_privkey'], o['EOF']-o['enc_privkey'])
73
74     pubkey_s = data[HEADER_LENGTH:o['signature']]
75     signature = data[o['signature']:o['share_hash_chain']]
76
77     return (seqnum, root_hash, IV, k, N, segsize, datalen,
78             pubkey_s, signature, prefix)
79
80 def unpack_share(data):
81     assert len(data) >= HEADER_LENGTH
82     o = {}
83     (version,
84      seqnum,
85      root_hash,
86      IV,
87      k, N, segsize, datalen,
88      o['signature'],
89      o['share_hash_chain'],
90      o['block_hash_tree'],
91      o['share_data'],
92      o['enc_privkey'],
93      o['EOF']) = struct.unpack(HEADER, data[:HEADER_LENGTH])
94
95     assert version == 0
96     if len(data) < o['EOF']:
97         raise NeedMoreDataError(o['EOF'],
98                                 o['enc_privkey'], o['EOF']-o['enc_privkey'])
99
100     pubkey = data[HEADER_LENGTH:o['signature']]
101     signature = data[o['signature']:o['share_hash_chain']]
102     share_hash_chain_s = data[o['share_hash_chain']:o['block_hash_tree']]
103     share_hash_format = ">H32s"
104     hsize = struct.calcsize(share_hash_format)
105     assert len(share_hash_chain_s) % hsize == 0, len(share_hash_chain_s)
106     share_hash_chain = []
107     for i in range(0, len(share_hash_chain_s), hsize):
108         chunk = share_hash_chain_s[i:i+hsize]
109         (hid, h) = struct.unpack(share_hash_format, chunk)
110         share_hash_chain.append( (hid, h) )
111     share_hash_chain = dict(share_hash_chain)
112     block_hash_tree_s = data[o['block_hash_tree']:o['share_data']]
113     assert len(block_hash_tree_s) % 32 == 0, len(block_hash_tree_s)
114     block_hash_tree = []
115     for i in range(0, len(block_hash_tree_s), 32):
116         block_hash_tree.append(block_hash_tree_s[i:i+32])
117
118     share_data = data[o['share_data']:o['enc_privkey']]
119     enc_privkey = data[o['enc_privkey']:o['EOF']]
120
121     return (seqnum, root_hash, IV, k, N, segsize, datalen,
122             pubkey, signature, share_hash_chain, block_hash_tree,
123             share_data, enc_privkey)
124
125
126 def pack_checkstring(seqnum, root_hash, IV):
127     return struct.pack(PREFIX,
128                        0, # version,
129                        seqnum,
130                        root_hash,
131                        IV)
132
133 def unpack_checkstring(checkstring):
134     cs_len = struct.calcsize(PREFIX)
135     version, seqnum, root_hash, IV = struct.unpack(PREFIX, checkstring[:cs_len])
136     assert version == 0 # TODO: just ignore the share
137     return (seqnum, root_hash, IV)
138
139 def pack_prefix(seqnum, root_hash, IV,
140                 required_shares, total_shares,
141                 segment_size, data_length):
142     prefix = struct.pack(SIGNED_PREFIX,
143                          0, # version,
144                          seqnum,
145                          root_hash,
146                          IV,
147
148                          required_shares,
149                          total_shares,
150                          segment_size,
151                          data_length,
152                          )
153     return prefix
154
155 def pack_offsets(verification_key_length, signature_length,
156                  share_hash_chain_length, block_hash_tree_length,
157                  share_data_length, encprivkey_length):
158     post_offset = HEADER_LENGTH
159     offsets = {}
160     o1 = offsets['signature'] = post_offset + verification_key_length
161     o2 = offsets['share_hash_chain'] = o1 + signature_length
162     o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length
163     o4 = offsets['share_data'] = o3 + block_hash_tree_length
164     o5 = offsets['enc_privkey'] = o4 + share_data_length
165     o6 = offsets['EOF'] = o5 + encprivkey_length
166
167     return struct.pack(">LLLLQQ",
168                        offsets['signature'],
169                        offsets['share_hash_chain'],
170                        offsets['block_hash_tree'],
171                        offsets['share_data'],
172                        offsets['enc_privkey'],
173                        offsets['EOF'])
174
175 def pack_share(prefix, verification_key, signature,
176                share_hash_chain, block_hash_tree,
177                share_data, encprivkey):
178     share_hash_chain_s = "".join([struct.pack(">H32s", i, share_hash_chain[i])
179                                   for i in sorted(share_hash_chain.keys())])
180     for h in block_hash_tree:
181         assert len(h) == 32
182     block_hash_tree_s = "".join(block_hash_tree)
183
184     offsets = pack_offsets(len(verification_key),
185                            len(signature),
186                            len(share_hash_chain_s),
187                            len(block_hash_tree_s),
188                            len(share_data),
189                            len(encprivkey))
190     final_share = "".join([prefix,
191                            offsets,
192                            verification_key,
193                            signature,
194                            share_hash_chain_s,
195                            block_hash_tree_s,
196                            share_data,
197                            encprivkey])
198     return final_share
199
200
201 class RetrieveStatus:
202     implements(IRetrieveStatus)
203     statusid_counter = count(0)
204     def __init__(self):
205         self.timings = {}
206         self.sharemap = None
207         self.active = True
208         self.storage_index = None
209         self.helper = False
210         self.size = None
211         self.status = "Not started"
212         self.progress = 0.0
213         self.counter = self.statusid_counter.next()
214
215     def get_storage_index(self):
216         return self.storage_index
217     def using_helper(self):
218         return self.helper
219     def get_size(self):
220         return self.size
221     def get_status(self):
222         return self.status
223     def get_progress(self):
224         return self.progress
225     def get_active(self):
226         return self.active
227     def get_counter(self):
228         return self.counter
229
230     def set_storage_index(self, si):
231         self.storage_index = si
232     def set_helper(self, helper):
233         self.helper = helper
234     def set_size(self, size):
235         self.size = size
236     def set_status(self, status):
237         self.status = status
238     def set_progress(self, value):
239         self.progress = value
240     def set_active(self, value):
241         self.active = value
242
243 class Retrieve:
244     def __init__(self, filenode):
245         self._node = filenode
246         self._contents = None
247         # if the filenode already has a copy of the pubkey, use it. Otherwise
248         # we'll grab a copy from the first peer we talk to.
249         self._pubkey = filenode.get_pubkey()
250         self._storage_index = filenode.get_storage_index()
251         self._readkey = filenode.get_readkey()
252         self._last_failure = None
253         self._log_number = None
254         self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
255         num = self._node._client.log("Retrieve(%s): starting" % prefix)
256         self._log_number = num
257         self._status = RetrieveStatus()
258         self._status.set_storage_index(self._storage_index)
259         self._status.set_helper(False)
260         self._status.set_progress(0.0)
261         self._status.set_active(True)
262
263     def log(self, msg, **kwargs):
264         prefix = self._log_prefix
265         num = self._node._client.log("Retrieve(%s): %s" % (prefix, msg),
266                                      parent=self._log_number, **kwargs)
267         return num
268
269     def log_err(self, f):
270         num = log.err(f, parent=self._log_number)
271         return num
272
273     def retrieve(self):
274         """Retrieve the filenode's current contents. Returns a Deferred that
275         fires with a string when the contents have been retrieved."""
276
277         # 1: make a guess as to how many peers we should send requests to. We
278         #    want to hear from k+EPSILON (k because we have to, EPSILON extra
279         #    because that helps us resist rollback attacks). [TRADEOFF:
280         #    EPSILON>0 means extra work] [TODO: implement EPSILON>0]
281         # 2: build the permuted peerlist, taking the first k+E peers
282         # 3: send readv requests to all of them in parallel, asking for the
283         #    first 2KB of data from all shares
284         # 4: when the first of the responses comes back, extract information:
285         # 4a: extract the pubkey, hash it, compare against the URI. If this
286         #     check fails, log a WEIRD and ignore the peer.
287         # 4b: extract the prefix (seqnum, roothash, k, N, segsize, datalength)
288         #     and verify the signature on it. If this is wrong, log a WEIRD
289         #     and ignore the peer. Save the prefix string in a dict that's
290         #     keyed by (seqnum,roothash) and has (prefixstring, sharemap) as
291         #     values. We'll use the prefixstring again later to avoid doing
292         #     multiple signature checks
293         # 4c: extract the share size (offset of the last byte of sharedata).
294         #     if it is larger than 2k, send new readv requests to pull down
295         #     the extra data
296         # 4d: if the extracted 'k' is more than we guessed, rebuild a larger
297         #     permuted peerlist and send out more readv requests.
298         # 5: as additional responses come back, extract the prefix and compare
299         #    against the ones we've already seen. If they match, add the
300         #    peerid to the corresponing sharemap dict
301         # 6: [TRADEOFF]: if EPSILON==0, when we get k responses for the
302         #    same (seqnum,roothash) key, attempt to reconstruct that data.
303         #    if EPSILON>0, wait for k+EPSILON responses, then attempt to
304         #    reconstruct the most popular version.. If we do not have enough
305         #    shares and there are still requests outstanding, wait. If there
306         #    are not still requests outstanding (todo: configurable), send
307         #    more requests. Never send queries to more than 2*N servers. If
308         #    we've run out of servers, fail.
309         # 7: if we discover corrupt shares during the reconstruction process,
310         #    remove that share from the sharemap.  and start step#6 again.
311
312         initial_query_count = 5
313         # how much data should be read on the first fetch? It would be nice
314         # if we could grab small directories in a single RTT. The way we pack
315         # dirnodes consumes about 112 bytes per child. The way we pack
316         # mutable files puts about 935 bytes of pubkey+sig+hashes, then our
317         # data, then about 1216 bytes of encprivkey. So 2kB ought to get us
318         # about 9 entries, which seems like a good default.
319         self._read_size = 2000
320
321         # we might not know how many shares we need yet.
322         self._required_shares = self._node.get_required_shares()
323         self._total_shares = self._node.get_total_shares()
324
325         # self._valid_versions is a dictionary in which the keys are
326         # 'verinfo' tuples (seqnum, root_hash, IV). Every time we hear about
327         # a new potential version of the file, we check its signature, and
328         # the valid ones are added to this dictionary. The values of the
329         # dictionary are (prefix, sharemap) tuples, where 'prefix' is just
330         # the first part of the share (containing the serialized verinfo),
331         # for easier comparison. 'sharemap' is a DictOfSets, in which the
332         # keys are sharenumbers, and the values are sets of (peerid, data)
333         # tuples. There is a (peerid, data) tuple for every instance of a
334         # given share that we've seen. The 'data' in this tuple is a full
335         # copy of the SDMF share, starting with the \x00 version byte and
336         # continuing through the last byte of sharedata.
337         self._valid_versions = {}
338
339         # self._valid_shares is a dict mapping (peerid,data) tuples to
340         # validated sharedata strings. Each time we examine the hash chains
341         # inside a share and validate them against a signed root_hash, we add
342         # the share to self._valid_shares . We use this to avoid re-checking
343         # the hashes over and over again.
344         self._valid_shares = {}
345
346         self._done_deferred = defer.Deferred()
347
348         d = defer.succeed(initial_query_count)
349         d.addCallback(self._choose_initial_peers)
350         d.addCallback(self._send_initial_requests)
351         d.addCallback(self._wait_for_finish)
352         return d
353
354     def _wait_for_finish(self, res):
355         return self._done_deferred
356
357     def _choose_initial_peers(self, numqueries):
358         n = self._node
359         full_peerlist = n._client.get_permuted_peers("storage",
360                                                      self._storage_index)
361
362         # _peerlist is a list of (peerid,conn) tuples for peers that are
363         # worth talking too. This starts with the first numqueries in the
364         # permuted list. If that's not enough to get us a recoverable
365         # version, we expand this to include the first 2*total_shares peerids
366         # (assuming we learn what total_shares is from one of the first
367         # numqueries peers)
368         self._peerlist = [p for p in islice(full_peerlist, numqueries)]
369         # _peerlist_limit is the query limit we used to build this list. If
370         # we later increase this limit, it may be useful to re-scan the
371         # permuted list.
372         self._peerlist_limit = numqueries
373         return self._peerlist
374
375     def _send_initial_requests(self, peerlist):
376         self._bad_peerids = set()
377         self._running = True
378         self._queries_outstanding = set()
379         self._used_peers = set()
380         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
381         dl = []
382         for (peerid, ss) in peerlist:
383             self._queries_outstanding.add(peerid)
384             self._do_query(ss, peerid, self._storage_index, self._read_size)
385
386         # control flow beyond this point: state machine. Receiving responses
387         # from queries is the input. We might send out more queries, or we
388         # might produce a result.
389         return None
390
391     def _do_query(self, ss, peerid, storage_index, readsize):
392         self._queries_outstanding.add(peerid)
393         d = ss.callRemote("slot_readv", storage_index, [], [(0, readsize)])
394         d.addCallback(self._got_results, peerid, readsize, (ss, storage_index))
395         d.addErrback(self._query_failed, peerid)
396         # errors that aren't handled by _query_failed (and errors caused by
397         # _query_failed) get logged, but we still want to check for doneness.
398         d.addErrback(log.err)
399         d.addBoth(self._check_for_done)
400         return d
401
402     def _deserialize_pubkey(self, pubkey_s):
403         verifier = rsa.create_verifying_key_from_string(pubkey_s)
404         return verifier
405
406     def _got_results(self, datavs, peerid, readsize, stuff):
407         self._queries_outstanding.discard(peerid)
408         self._used_peers.add(peerid)
409         if not self._running:
410             return
411
412         for shnum,datav in datavs.items():
413             data = datav[0]
414             try:
415                 self._got_results_one_share(shnum, data, peerid)
416             except NeedMoreDataError, e:
417                 # ah, just re-send the query then.
418                 self._read_size = max(self._read_size, e.needed_bytes)
419                 # TODO: for MDMF, sanity-check self._read_size: don't let one
420                 # server cause us to try to read gigabytes of data from all
421                 # other servers.
422                 (ss, storage_index) = stuff
423                 self._do_query(ss, peerid, storage_index, self._read_size)
424                 return
425             except CorruptShareError, e:
426                 # log it and give the other shares a chance to be processed
427                 f = failure.Failure()
428                 self.log("bad share: %s %s" % (f, f.value), level=log.WEIRD)
429                 self._bad_peerids.add(peerid)
430                 self._last_failure = f
431                 pass
432         # all done!
433
434     def _got_results_one_share(self, shnum, data, peerid):
435         self.log("_got_results: got shnum #%d from peerid %s"
436                  % (shnum, idlib.shortnodeid_b2a(peerid)))
437         (seqnum, root_hash, IV, k, N, segsize, datalength,
438          # this might raise NeedMoreDataError, in which case the rest of
439          # the shares are probably short too. _query_failed() will take
440          # responsiblity for re-issuing the queries with a new length.
441          pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
442
443         if not self._pubkey:
444             fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
445             assert len(fingerprint) == 32
446             if fingerprint != self._node._fingerprint:
447                 raise CorruptShareError(peerid, shnum,
448                                         "pubkey doesn't match fingerprint")
449             self._pubkey = self._deserialize_pubkey(pubkey_s)
450             self._node._populate_pubkey(self._pubkey)
451
452         verinfo = (seqnum, root_hash, IV, segsize, datalength)
453         if verinfo not in self._valid_versions:
454             # it's a new pair. Verify the signature.
455             valid = self._pubkey.verify(prefix, signature)
456             if not valid:
457                 raise CorruptShareError(peerid, shnum,
458                                         "signature is invalid")
459             # ok, it's a valid verinfo. Add it to the list of validated
460             # versions.
461             self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
462                      % (seqnum, base32.b2a(root_hash)[:4],
463                         idlib.shortnodeid_b2a(peerid), shnum,
464                         k, N, segsize, datalength))
465             self._valid_versions[verinfo] = (prefix, DictOfSets())
466
467             # and make a note of the other parameters we've just learned
468             # NOTE: Retrieve needs to be refactored to put k,N in the verinfo
469             # along with seqnum/etc, to make sure we don't co-mingle shares
470             # from differently-encoded versions of the same file.
471             if self._required_shares is None:
472                 self._required_shares = k
473                 self._node._populate_required_shares(k)
474             if self._total_shares is None:
475                 self._total_shares = N
476                 self._node._populate_total_shares(N)
477
478         # reject shares that don't match our narrow-minded ideas of what
479         # encoding we're going to use. This addresses the immediate needs of
480         # ticket #312, by turning the data corruption into unavailability. To
481         # get back the availability (i.e. make sure that one weird-encoding
482         # share that happens to come back first doesn't make us ignore the
483         # rest of the shares), we need to implement the refactoring mentioned
484         # above.
485         if k != self._required_shares:
486             raise CorruptShareError(peerid, shnum,
487                                     "share has k=%d, we want k=%d" %
488                                     (k, self._required_shares))
489
490         if N != self._total_shares:
491             raise CorruptShareError(peerid, shnum,
492                                     "share has N=%d, we want N=%d" %
493                                     (N, self._total_shares))
494
495         # we've already seen this pair, and checked the signature so we
496         # know it's a valid candidate. Accumulate the share info, if
497         # there's enough data present. If not, raise NeedMoreDataError,
498         # which will trigger a re-fetch.
499         _ignored = unpack_share(data)
500         self.log(" found enough data to add share contents")
501         self._valid_versions[verinfo][1].add(shnum, (peerid, data))
502
503
504     def _query_failed(self, f, peerid):
505         if not self._running:
506             return
507         self._queries_outstanding.discard(peerid)
508         self._used_peers.add(peerid)
509         self._last_failure = f
510         self._bad_peerids.add(peerid)
511         self.log("error during query: %s %s" % (f, f.value), level=log.WEIRD)
512
513     def _check_for_done(self, res):
514         if not self._running:
515             self.log("ODD: _check_for_done but we're not running")
516             return
517         share_prefixes = {}
518         versionmap = DictOfSets()
519         for verinfo, (prefix, sharemap) in self._valid_versions.items():
520             # sharemap is a dict that maps shnums to sets of (peerid,data).
521             # len(sharemap) is the number of distinct shares that appear to
522             # be available.
523             if len(sharemap) >= self._required_shares:
524                 # this one looks retrievable. TODO: our policy of decoding
525                 # the first version that we can get is a bit troublesome: in
526                 # a small grid with a large expansion factor, a single
527                 # out-of-date server can cause us to retrieve an older
528                 # version. Fixing this is equivalent to protecting ourselves
529                 # against a rollback attack, and the best approach is
530                 # probably to say that we won't do _attempt_decode until:
531                 #  (we've received at least k+EPSILON shares or
532                 #   we've received at least k shares and ran out of servers)
533                 # in that case, identify the verinfos that are decodeable and
534                 # attempt the one with the highest (seqnum,R) value. If the
535                 # highest seqnum can't be recovered, only then might we fall
536                 # back to an older version.
537                 d = defer.maybeDeferred(self._attempt_decode, verinfo, sharemap)
538                 def _problem(f):
539                     self._last_failure = f
540                     if f.check(CorruptShareError):
541                         self.log("saw corrupt share, rescheduling",
542                                  level=log.WEIRD)
543                         # _attempt_decode is responsible for removing the bad
544                         # share, so we can just try again
545                         eventually(self._check_for_done, None)
546                         return
547                     return f
548                 d.addCallbacks(self._done, _problem)
549                 # TODO: create an errback-routing mechanism to make sure that
550                 # weird coding errors will cause the retrieval to fail rather
551                 # than hanging forever. Any otherwise-unhandled exceptions
552                 # should follow this path. A simple way to test this is to
553                 # raise BadNameError in _validate_share_and_extract_data .
554                 return
555
556         # we don't have enough shares yet. Should we send out more queries?
557         if self._queries_outstanding:
558             # there are some running, so just wait for them to come back.
559             # TODO: if our initial guess at k was too low, waiting for these
560             # responses before sending new queries will increase our latency,
561             # so we could speed things up by sending new requests earlier.
562             self.log("ROUTINE:  %d queries outstanding" %
563                      len(self._queries_outstanding))
564             return
565
566         # no more queries are outstanding. Can we send out more? First,
567         # should we be looking at more peers?
568         self.log("need more peers: N=%s, peerlist=%d peerlist_limit=%d" %
569                  (self._total_shares, len(self._peerlist),
570                   self._peerlist_limit), level=log.UNUSUAL)
571         if self._total_shares is not None:
572             search_distance = self._total_shares * 2
573         else:
574             search_distance = 20
575         self.log("search_distance=%d" % search_distance, level=log.UNUSUAL)
576         if self._peerlist_limit < search_distance:
577             # we might be able to get some more peers from the list
578             peers = self._node._client.get_permuted_peers("storage",
579                                                           self._storage_index)
580             self._peerlist = [p for p in islice(peers, search_distance)]
581             self._peerlist_limit = search_distance
582             self.log("added peers, peerlist=%d, peerlist_limit=%d"
583                      % (len(self._peerlist), self._peerlist_limit),
584                      level=log.UNUSUAL)
585         # are there any peers on the list that we haven't used?
586         new_query_peers = []
587         for (peerid, ss) in self._peerlist:
588             if peerid not in self._used_peers:
589                 new_query_peers.append( (peerid, ss) )
590                 if len(new_query_peers) > 5:
591                     # only query in batches of 5. TODO: this is pretty
592                     # arbitrary, really I want this to be something like
593                     # k - max(known_version_sharecounts) + some extra
594                     break
595         if new_query_peers:
596             self.log("sending %d new queries (read %d bytes)" %
597                      (len(new_query_peers), self._read_size), level=log.UNUSUAL)
598             for (peerid, ss) in new_query_peers:
599                 self._do_query(ss, peerid, self._storage_index, self._read_size)
600             # we'll retrigger when those queries come back
601             return
602
603         # we've used up all the peers we're allowed to search. Failure.
604         self.log("ran out of peers", level=log.WEIRD)
605         e = NotEnoughPeersError("last failure: %s" % self._last_failure)
606         return self._done(failure.Failure(e))
607
608     def _attempt_decode(self, verinfo, sharemap):
609         # sharemap is a dict which maps shnum to [(peerid,data)..] sets.
610         (seqnum, root_hash, IV, segsize, datalength) = verinfo
611
612         assert len(sharemap) >= self._required_shares, len(sharemap)
613
614         shares_s = []
615         for shnum in sorted(sharemap.keys()):
616             for shareinfo in sharemap[shnum]:
617                 shares_s.append("#%d" % shnum)
618         shares_s = ",".join(shares_s)
619         self.log("_attempt_decode: version %d-%s, shares: %s" %
620                  (seqnum, base32.b2a(root_hash)[:4], shares_s))
621
622         # first, validate each share that we haven't validated yet. We use
623         # self._valid_shares to remember which ones we've already checked.
624
625         shares = {}
626         for shnum, shareinfos in sharemap.items():
627             assert len(shareinfos) > 0
628             for shareinfo in shareinfos:
629                 # have we already validated the hashes on this share?
630                 if shareinfo not in self._valid_shares:
631                     # nope: must check the hashes and extract the actual data
632                     (peerid,data) = shareinfo
633                     try:
634                         # The (seqnum+root_hash+IV) tuple for this share was
635                         # already verified: specifically, all shares in the
636                         # sharemap have a (seqnum+root_hash+IV) pair that was
637                         # present in a validly signed prefix. The remainder
638                         # of the prefix for this particular share has *not*
639                         # been validated, but we don't care since we don't
640                         # use it. self._validate_share() is required to check
641                         # the hashes on the share data (and hash chains) to
642                         # make sure they match root_hash, but is not required
643                         # (and is in fact prohibited, because we don't
644                         # validate the prefix on all shares) from using
645                         # anything else in the share.
646                         validator = self._validate_share_and_extract_data
647                         sharedata = validator(peerid, root_hash, shnum, data)
648                         assert isinstance(sharedata, str)
649                     except CorruptShareError, e:
650                         self.log("share was corrupt: %s" % e, level=log.WEIRD)
651                         sharemap[shnum].discard(shareinfo)
652                         if not sharemap[shnum]:
653                             # remove the key so the test in _check_for_done
654                             # can accurately decide that we don't have enough
655                             # shares to try again right now.
656                             del sharemap[shnum]
657                         # If there are enough remaining shares,
658                         # _check_for_done() will try again
659                         raise
660                     # share is valid: remember it so we won't need to check
661                     # (or extract) it again
662                     self._valid_shares[shareinfo] = sharedata
663
664                 # the share is now in _valid_shares, so just copy over the
665                 # sharedata
666                 shares[shnum] = self._valid_shares[shareinfo]
667
668         # now that the big loop is done, all shares in the sharemap are
669         # valid, and they're all for the same seqnum+root_hash version, so
670         # it's now down to doing FEC and decrypt.
671         assert len(shares) >= self._required_shares, len(shares)
672         d = defer.maybeDeferred(self._decode, shares, segsize, datalength)
673         d.addCallback(self._decrypt, IV, seqnum, root_hash)
674         return d
675
676     def _validate_share_and_extract_data(self, peerid, root_hash, shnum, data):
677         # 'data' is the whole SMDF share
678         self.log("_validate_share_and_extract_data[%d]" % shnum)
679         assert data[0] == "\x00"
680         pieces = unpack_share(data)
681         (seqnum, root_hash_copy, IV, k, N, segsize, datalen,
682          pubkey, signature, share_hash_chain, block_hash_tree,
683          share_data, enc_privkey) = pieces
684
685         assert isinstance(share_data, str)
686         # build the block hash tree. SDMF has only one leaf.
687         leaves = [hashutil.block_hash(share_data)]
688         t = hashtree.HashTree(leaves)
689         if list(t) != block_hash_tree:
690             raise CorruptShareError(peerid, shnum, "block hash tree failure")
691         share_hash_leaf = t[0]
692         t2 = hashtree.IncompleteHashTree(N)
693         # root_hash was checked by the signature
694         t2.set_hashes({0: root_hash})
695         try:
696             t2.set_hashes(hashes=share_hash_chain,
697                           leaves={shnum: share_hash_leaf})
698         except (hashtree.BadHashError, hashtree.NotEnoughHashesError), e:
699             msg = "corrupt hashes: %s" % (e,)
700             raise CorruptShareError(peerid, shnum, msg)
701         self.log(" data valid! len=%d" % len(share_data))
702         return share_data
703
704     def _decode(self, shares_dict, segsize, datalength):
705         # we ought to know these values by now
706         assert self._required_shares is not None
707         assert self._total_shares is not None
708
709         # shares_dict is a dict mapping shnum to share data, but the codec
710         # wants two lists.
711         shareids = []; shares = []
712         for shareid, share in shares_dict.items():
713             shareids.append(shareid)
714             shares.append(share)
715
716         assert len(shareids) >= self._required_shares, len(shareids)
717         # zfec really doesn't want extra shares
718         shareids = shareids[:self._required_shares]
719         shares = shares[:self._required_shares]
720
721         fec = codec.CRSDecoder()
722         params = "%d-%d-%d" % (segsize,
723                                self._required_shares, self._total_shares)
724         fec.set_serialized_params(params)
725
726         self.log("params %s, we have %d shares" % (params, len(shares)))
727         self.log("about to decode, shareids=%s" % (shareids,))
728         d = defer.maybeDeferred(fec.decode, shares, shareids)
729         def _done(buffers):
730             self.log(" decode done, %d buffers" % len(buffers))
731             segment = "".join(buffers)
732             self.log(" joined length %d, datalength %d" %
733                      (len(segment), datalength))
734             segment = segment[:datalength]
735             self.log(" segment len=%d" % len(segment))
736             return segment
737         def _err(f):
738             self.log(" decode failed: %s" % f)
739             return f
740         d.addCallback(_done)
741         d.addErrback(_err)
742         return d
743
744     def _decrypt(self, crypttext, IV, seqnum, root_hash):
745         key = hashutil.ssk_readkey_data_hash(IV, self._readkey)
746         decryptor = AES(key)
747         plaintext = decryptor.process(crypttext)
748         # it worked, so record the seqnum and root_hash for next time
749         self._node._populate_seqnum(seqnum)
750         self._node._populate_root_hash(root_hash)
751         return plaintext
752
753     def _done(self, contents):
754         self.log("DONE")
755         self._running = False
756         self._status.set_active(False)
757         self._status.set_status("Done")
758         self._status.set_progress(1.0)
759         self._status.set_size(len(contents))
760         eventually(self._done_deferred.callback, contents)
761
762     def get_status(self):
763         return self._status
764
765
766 class DictOfSets(dict):
767     def add(self, key, value):
768         if key in self:
769             self[key].add(value)
770         else:
771             self[key] = set([value])
772
773 class PublishStatus:
774     implements(IPublishStatus)
775     statusid_counter = count(0)
776     def __init__(self):
777         self.timings = {}
778         self.sharemap = None
779         self.active = True
780         self.storage_index = None
781         self.helper = False
782         self.size = None
783         self.status = "Not started"
784         self.progress = 0.0
785         self.counter = self.statusid_counter.next()
786
787     def get_storage_index(self):
788         return self.storage_index
789     def using_helper(self):
790         return self.helper
791     def get_size(self):
792         return self.size
793     def get_status(self):
794         return self.status
795     def get_progress(self):
796         return self.progress
797     def get_active(self):
798         return self.active
799     def get_counter(self):
800         return self.counter
801
802     def set_storage_index(self, si):
803         self.storage_index = si
804     def set_helper(self, helper):
805         self.helper = helper
806     def set_size(self, size):
807         self.size = size
808     def set_status(self, status):
809         self.status = status
810     def set_progress(self, value):
811         self.progress = value
812     def set_active(self, value):
813         self.active = value
814
815 class Publish:
816     """I represent a single act of publishing the mutable file to the grid."""
817
818     def __init__(self, filenode):
819         self._node = filenode
820         self._storage_index = self._node.get_storage_index()
821         self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
822         num = self._node._client.log("Publish(%s): starting" % prefix)
823         self._log_number = num
824         self._status = PublishStatus()
825         self._status.set_storage_index(self._storage_index)
826         self._status.set_helper(False)
827         self._status.set_progress(0.0)
828         self._status.set_active(True)
829
830     def log(self, *args, **kwargs):
831         if 'parent' not in kwargs:
832             kwargs['parent'] = self._log_number
833         num = log.msg(*args, **kwargs)
834         return num
835
836     def log_err(self, *args, **kwargs):
837         if 'parent' not in kwargs:
838             kwargs['parent'] = self._log_number
839         num = log.err(*args, **kwargs)
840         return num
841
842     def publish(self, newdata):
843         """Publish the filenode's current contents.  Returns a Deferred that
844         fires (with None) when the publish has done as much work as it's ever
845         going to do, or errbacks with ConsistencyError if it detects a
846         simultaneous write.
847         """
848
849         # 1: generate shares (SDMF: files are small, so we can do it in RAM)
850         # 2: perform peer selection, get candidate servers
851         #  2a: send queries to n+epsilon servers, to determine current shares
852         #  2b: based upon responses, create target map
853         # 3: send slot_testv_and_readv_and_writev messages
854         # 4: as responses return, update share-dispatch table
855         # 4a: may need to run recovery algorithm
856         # 5: when enough responses are back, we're done
857
858         self.log("starting publish, datalen is %s" % len(newdata))
859         self._started = time.time()
860         self._status.set_size(len(newdata))
861
862         self._writekey = self._node.get_writekey()
863         assert self._writekey, "need write capability to publish"
864
865         old_roothash = self._node._current_roothash
866         old_seqnum = self._node._current_seqnum
867         assert old_seqnum is not None, "must read before replace"
868         self._new_seqnum = old_seqnum + 1
869
870         # read-before-replace also guarantees these fields are available
871         readkey = self._node.get_readkey()
872         required_shares = self._node.get_required_shares()
873         total_shares = self._node.get_total_shares()
874         self._pubkey = self._node.get_pubkey()
875
876         # these two may not be, we might have to get them from the first peer
877         self._privkey = self._node.get_privkey()
878         self._encprivkey = self._node.get_encprivkey()
879
880         IV = os.urandom(16)
881
882         # we read only 1KB because all we generally care about is the seqnum
883         # ("prefix") info, so we know which shares are where. We need to get
884         # the privkey from somebody, which means reading more like 3KB, but
885         # the code in _obtain_privkey will ensure that we manage that even if
886         # we need an extra roundtrip. TODO: arrange to read 3KB from one peer
887         # who is likely to hold a share, so we can avoid the latency of that
888         # extra roundtrip. 3KB would get us the encprivkey from a dirnode
889         # with up to 7 entries, allowing us to make an update in 2 RTT
890         # instead of 3.
891         self._read_size = 1000
892
893         d = defer.succeed(total_shares)
894         d.addCallback(self._query_peers)
895         d.addCallback(self._obtain_privkey)
896
897         d.addCallback(self._encrypt_and_encode, newdata, readkey, IV,
898                       required_shares, total_shares)
899         d.addCallback(self._generate_shares, self._new_seqnum, IV)
900
901         d.addCallback(self._send_shares, IV)
902         d.addCallback(self._maybe_recover)
903         d.addCallback(self._done)
904         return d
905
906     def _query_peers(self, total_shares):
907         self.log("_query_peers")
908
909         storage_index = self._storage_index
910
911         # In 0.7.0, we went through extra work to make sure that we include
912         # ourselves in the peerlist, mainly to match Retrieve (which did the
913         # same thing. With the post-0.7.0 Introducer refactoring, we got rid
914         # of the include-myself flags, and standardized on the
915         # uploading/downloading node not being special.
916
917         # One nice feature of the old approach was that by putting a share on
918         # the local storage server, we're more likely to be able to retrieve
919         # a copy of the encrypted private key (even if all the old servers
920         # have gone away), so we can regenerate new shares even if we can't
921         # retrieve the old contents. This need will eventually go away when
922         # we switch to DSA-based mutable files (which store the private key
923         # in the URI).
924
925         peerlist = self._node._client.get_permuted_peers("storage",
926                                                          storage_index)
927
928         current_share_peers = DictOfSets()
929         reachable_peers = {}
930         # list of (peerid, shnum, offset, length) where the encprivkey might
931         # be found
932         self._encprivkey_shares = []
933
934         EPSILON = total_shares / 2
935         #partial_peerlist = islice(peerlist, total_shares + EPSILON)
936         partial_peerlist = peerlist[:total_shares+EPSILON]
937
938         self._storage_servers = {}
939
940         dl = []
941         for permutedid, (peerid, ss) in enumerate(partial_peerlist):
942             self._storage_servers[peerid] = ss
943             d = self._do_query(ss, peerid, storage_index)
944             d.addCallback(self._got_query_results,
945                           peerid, permutedid,
946                           reachable_peers, current_share_peers)
947             dl.append(d)
948         d = defer.DeferredList(dl)
949         d.addCallback(self._got_all_query_results,
950                       total_shares, reachable_peers,
951                       current_share_peers)
952         # TODO: add an errback to, probably to ignore that peer
953
954         # TODO: if we can't get a privkey from these servers, consider
955         # looking farther afield. Be aware of the old 0.7.0 behavior that
956         # causes us to create our initial directory before we've connected to
957         # anyone but ourselves.. those old directories may not be
958         # retrieveable if our own server is no longer in the early part of
959         # the permuted peerlist.
960         return d
961
962     def _do_query(self, ss, peerid, storage_index):
963         self.log("querying %s" % idlib.shortnodeid_b2a(peerid))
964         d = ss.callRemote("slot_readv",
965                           storage_index, [], [(0, self._read_size)])
966         return d
967
968     def _got_query_results(self, datavs, peerid, permutedid,
969                            reachable_peers, current_share_peers):
970
971         lp = self.log(format="_got_query_results from %(peerid)s",
972                       peerid=idlib.shortnodeid_b2a(peerid))
973         assert isinstance(datavs, dict)
974         reachable_peers[peerid] = permutedid
975         if not datavs:
976             self.log("peer has no shares", parent=lp)
977         for shnum, datav in datavs.items():
978             lp2 = self.log("peer has shnum %d" % shnum, parent=lp)
979             assert len(datav) == 1
980             data = datav[0]
981             # We want (seqnum, root_hash, IV) from all servers to know what
982             # versions we are replacing. We want the encprivkey from one
983             # server (assuming it's valid) so we know our own private key, so
984             # we can sign our update. SMDF: read the whole share from each
985             # server. TODO: later we can optimize this to transfer less data.
986
987             # we assume that we have enough data to extract the signature.
988             # TODO: if this raises NeedMoreDataError, arrange to do another
989             # read pass.
990             r = unpack_prefix_and_signature(data)
991             (seqnum, root_hash, IV, k, N, segsize, datalen,
992              pubkey_s, signature, prefix) = r
993
994             # self._pubkey is present because we require read-before-replace
995             valid = self._pubkey.verify(prefix, signature)
996             if not valid:
997                 self.log(format="bad signature from %(peerid)s shnum %(shnum)d",
998                          peerid=idlib.shortnodeid_b2a(peerid), shnum=shnum,
999                          parent=lp2, level=log.WEIRD)
1000                 continue
1001             self.log(format="peer has goodsig shnum %(shnum)d seqnum %(seqnum)d",
1002                      shnum=shnum, seqnum=seqnum,
1003                      parent=lp2, level=log.NOISY)
1004
1005             share = (shnum, seqnum, root_hash)
1006             current_share_peers.add(shnum, (peerid, seqnum, root_hash) )
1007
1008             if not self._privkey:
1009                 self._try_to_extract_privkey(data, peerid, shnum)
1010
1011
1012     def _try_to_extract_privkey(self, data, peerid, shnum):
1013         try:
1014             r = unpack_share(data)
1015         except NeedMoreDataError, e:
1016             # this share won't help us. oh well.
1017             offset = e.encprivkey_offset
1018             length = e.encprivkey_length
1019             self.log("shnum %d on peerid %s: share was too short (%dB) "
1020                      "to get the encprivkey; [%d:%d] ought to hold it" %
1021                      (shnum, idlib.shortnodeid_b2a(peerid), len(data),
1022                       offset, offset+length))
1023             # NOTE: if uncoordinated writes are taking place, someone might
1024             # change the share (and most probably move the encprivkey) before
1025             # we get a chance to do one of these reads and fetch it. This
1026             # will cause us to see a NotEnoughPeersError(unable to fetch
1027             # privkey) instead of an UncoordinatedWriteError . This is a
1028             # nuisance, but it will go away when we move to DSA-based mutable
1029             # files (since the privkey will be small enough to fit in the
1030             # write cap).
1031
1032             self._encprivkey_shares.append( (peerid, shnum, offset, length))
1033             return
1034
1035         (seqnum, root_hash, IV, k, N, segsize, datalen,
1036          pubkey, signature, share_hash_chain, block_hash_tree,
1037          share_data, enc_privkey) = r
1038
1039         return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
1040
1041     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
1042         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
1043         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
1044         if alleged_writekey != self._writekey:
1045             self.log("invalid privkey from %s shnum %d" %
1046                      (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
1047             return
1048
1049         # it's good
1050         self.log("got valid privkey from shnum %d on peerid %s" %
1051                  (shnum, idlib.shortnodeid_b2a(peerid)))
1052         self._privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
1053         self._encprivkey = enc_privkey
1054         self._node._populate_encprivkey(self._encprivkey)
1055         self._node._populate_privkey(self._privkey)
1056
1057     def _got_all_query_results(self, res,
1058                                total_shares, reachable_peers,
1059                                current_share_peers):
1060         self.log("_got_all_query_results")
1061         # now that we know everything about the shares currently out there,
1062         # decide where to place the new shares.
1063
1064         # if an old share X is on a node, put the new share X there too.
1065         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
1066         #       shares from existing peers to new (less-crowded) ones. The
1067         #       old shares must still be updated.
1068         # TODO: 2: move those shares instead of copying them, to reduce future
1069         #       update work
1070
1071         # if log.recording_noisy
1072         logmsg = []
1073         for shnum in range(total_shares):
1074             logmsg2 = []
1075             for oldplace in current_share_peers.get(shnum, []):
1076                 (peerid, seqnum, R) = oldplace
1077                 logmsg2.append("%s:#%d:R=%s" % (idlib.shortnodeid_b2a(peerid),
1078                                                 seqnum, base32.b2a(R)[:4]))
1079             logmsg.append("sh%d on (%s)" % (shnum, "/".join(logmsg2)))
1080         self.log("sharemap: %s" % (", ".join(logmsg)), level=log.NOISY)
1081         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
1082                  level=log.NOISY)
1083
1084         shares_needing_homes = range(total_shares)
1085         target_map = DictOfSets() # maps shnum to set((peerid,oldseqnum,oldR))
1086         shares_per_peer = DictOfSets()
1087         for shnum in range(total_shares):
1088             for oldplace in current_share_peers.get(shnum, []):
1089                 (peerid, seqnum, R) = oldplace
1090                 if seqnum >= self._new_seqnum:
1091                     self.log("somebody has a newer sequence number than what we were uploading",
1092                              level=log.WEIRD)
1093                     self.log(format="peerid=%(peerid)s, theirs=%(seqnum)d, mine=%(new_seqnum)d",
1094                              peerid=idlib.shortnodeid_b2a(peerid),
1095                              seqnum=seqnum,
1096                              new_seqnum=self._new_seqnum)
1097                     raise UncoordinatedWriteError("somebody has a newer sequence number than us")
1098                 target_map.add(shnum, oldplace)
1099                 shares_per_peer.add(peerid, shnum)
1100                 if shnum in shares_needing_homes:
1101                     shares_needing_homes.remove(shnum)
1102
1103         # now choose homes for the remaining shares. We prefer peers with the
1104         # fewest target shares, then peers with the lowest permuted index. If
1105         # there are no shares already in place, this will assign them
1106         # one-per-peer in the normal permuted order.
1107         while shares_needing_homes:
1108             if not reachable_peers:
1109                 prefix = storage.si_b2a(self._node.get_storage_index())[:5]
1110                 raise NotEnoughPeersError("ran out of peers during upload of (%s); shares_needing_homes: %s, reachable_peers: %s" % (prefix, shares_needing_homes, reachable_peers,))
1111             shnum = shares_needing_homes.pop(0)
1112             possible_homes = reachable_peers.keys()
1113             possible_homes.sort(lambda a,b:
1114                                 cmp( (len(shares_per_peer.get(a, [])),
1115                                       reachable_peers[a]),
1116                                      (len(shares_per_peer.get(b, [])),
1117                                       reachable_peers[b]) ))
1118             target_peerid = possible_homes[0]
1119             target_map.add(shnum, (target_peerid, None, None) )
1120             shares_per_peer.add(target_peerid, shnum)
1121
1122         assert not shares_needing_homes
1123
1124         target_info = (target_map, shares_per_peer)
1125         return target_info
1126
1127     def _obtain_privkey(self, target_info):
1128         # make sure we've got a copy of our private key.
1129         if self._privkey:
1130             # Must have picked it up during _query_peers. We're good to go.
1131             return target_info
1132
1133         # Nope, we haven't managed to grab a copy, and we still need it. Ask
1134         # peers one at a time until we get a copy. Only bother asking peers
1135         # who've admitted to holding a share.
1136
1137         target_map, shares_per_peer = target_info
1138         # pull shares from self._encprivkey_shares
1139         if not self._encprivkey_shares:
1140             raise NotEnoughPeersError("Unable to find a copy of the privkey")
1141
1142         (peerid, shnum, offset, length) = self._encprivkey_shares.pop(0)
1143         ss = self._storage_servers[peerid]
1144         self.log("trying to obtain privkey from %s shnum %d" %
1145                  (idlib.shortnodeid_b2a(peerid), shnum))
1146         d = self._do_privkey_query(ss, peerid, shnum, offset, length)
1147         d.addErrback(self.log_err)
1148         d.addCallback(lambda res: self._obtain_privkey(target_info))
1149         return d
1150
1151     def _do_privkey_query(self, rref, peerid, shnum, offset, length):
1152         d = rref.callRemote("slot_readv", self._storage_index,
1153                             [shnum], [(offset, length)] )
1154         d.addCallback(self._privkey_query_response, peerid, shnum)
1155         return d
1156
1157     def _privkey_query_response(self, datav, peerid, shnum):
1158         data = datav[shnum][0]
1159         self._try_to_validate_privkey(data, peerid, shnum)
1160
1161     def _encrypt_and_encode(self, target_info,
1162                             newdata, readkey, IV,
1163                             required_shares, total_shares):
1164         self.log("_encrypt_and_encode")
1165
1166         key = hashutil.ssk_readkey_data_hash(IV, readkey)
1167         enc = AES(key)
1168         crypttext = enc.process(newdata)
1169         assert len(crypttext) == len(newdata)
1170
1171         # now apply FEC
1172         self.MAX_SEGMENT_SIZE = 1024*1024
1173         data_length = len(crypttext)
1174
1175         segment_size = min(self.MAX_SEGMENT_SIZE, len(crypttext))
1176         # this must be a multiple of self.required_shares
1177         segment_size = mathutil.next_multiple(segment_size, required_shares)
1178         if segment_size:
1179             self.num_segments = mathutil.div_ceil(len(crypttext), segment_size)
1180         else:
1181             self.num_segments = 0
1182         assert self.num_segments in [0, 1,] # SDMF restrictions
1183         fec = codec.CRSEncoder()
1184         fec.set_params(segment_size, required_shares, total_shares)
1185         piece_size = fec.get_block_size()
1186         crypttext_pieces = [None] * required_shares
1187         for i in range(len(crypttext_pieces)):
1188             offset = i * piece_size
1189             piece = crypttext[offset:offset+piece_size]
1190             piece = piece + "\x00"*(piece_size - len(piece)) # padding
1191             crypttext_pieces[i] = piece
1192             assert len(piece) == piece_size
1193
1194         d = fec.encode(crypttext_pieces)
1195         d.addCallback(lambda shares_and_shareids:
1196                       (shares_and_shareids,
1197                        required_shares, total_shares,
1198                        segment_size, data_length,
1199                        target_info) )
1200         return d
1201
1202     def _generate_shares(self, (shares_and_shareids,
1203                                 required_shares, total_shares,
1204                                 segment_size, data_length,
1205                                 target_info),
1206                          seqnum, IV):
1207         self.log("_generate_shares")
1208
1209         # we should know these by now
1210         privkey = self._privkey
1211         encprivkey = self._encprivkey
1212         pubkey = self._pubkey
1213
1214         (shares, share_ids) = shares_and_shareids
1215
1216         assert len(shares) == len(share_ids)
1217         assert len(shares) == total_shares
1218         all_shares = {}
1219         block_hash_trees = {}
1220         share_hash_leaves = [None] * len(shares)
1221         for i in range(len(shares)):
1222             share_data = shares[i]
1223             shnum = share_ids[i]
1224             all_shares[shnum] = share_data
1225
1226             # build the block hash tree. SDMF has only one leaf.
1227             leaves = [hashutil.block_hash(share_data)]
1228             t = hashtree.HashTree(leaves)
1229             block_hash_trees[shnum] = block_hash_tree = list(t)
1230             share_hash_leaves[shnum] = t[0]
1231         for leaf in share_hash_leaves:
1232             assert leaf is not None
1233         share_hash_tree = hashtree.HashTree(share_hash_leaves)
1234         share_hash_chain = {}
1235         for shnum in range(total_shares):
1236             needed_hashes = share_hash_tree.needed_hashes(shnum)
1237             share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
1238                                               for i in needed_hashes ] )
1239         root_hash = share_hash_tree[0]
1240         assert len(root_hash) == 32
1241         self.log("my new root_hash is %s" % base32.b2a(root_hash))
1242
1243         prefix = pack_prefix(seqnum, root_hash, IV,
1244                              required_shares, total_shares,
1245                              segment_size, data_length)
1246
1247         # now pack the beginning of the share. All shares are the same up
1248         # to the signature, then they have divergent share hash chains,
1249         # then completely different block hash trees + IV + share data,
1250         # then they all share the same encprivkey at the end. The sizes
1251         # of everything are the same for all shares.
1252
1253         signature = privkey.sign(prefix)
1254
1255         verification_key = pubkey.serialize()
1256
1257         final_shares = {}
1258         for shnum in range(total_shares):
1259             final_share = pack_share(prefix,
1260                                      verification_key,
1261                                      signature,
1262                                      share_hash_chain[shnum],
1263                                      block_hash_trees[shnum],
1264                                      all_shares[shnum],
1265                                      encprivkey)
1266             final_shares[shnum] = final_share
1267         return (seqnum, root_hash, final_shares, target_info)
1268
1269
1270     def _send_shares(self, (seqnum, root_hash, final_shares, target_info), IV):
1271         self.log("_send_shares")
1272         # we're finally ready to send out our shares. If we encounter any
1273         # surprises here, it's because somebody else is writing at the same
1274         # time. (Note: in the future, when we remove the _query_peers() step
1275         # and instead speculate about [or remember] which shares are where,
1276         # surprises here are *not* indications of UncoordinatedWriteError,
1277         # and we'll need to respond to them more gracefully.)
1278
1279         target_map, shares_per_peer = target_info
1280
1281         my_checkstring = pack_checkstring(seqnum, root_hash, IV)
1282         peer_messages = {}
1283         expected_old_shares = {}
1284
1285         for shnum, peers in target_map.items():
1286             for (peerid, old_seqnum, old_root_hash) in peers:
1287                 testv = [(0, len(my_checkstring), "le", my_checkstring)]
1288                 new_share = final_shares[shnum]
1289                 writev = [(0, new_share)]
1290                 if peerid not in peer_messages:
1291                     peer_messages[peerid] = {}
1292                 peer_messages[peerid][shnum] = (testv, writev, None)
1293                 if peerid not in expected_old_shares:
1294                     expected_old_shares[peerid] = {}
1295                 expected_old_shares[peerid][shnum] = (old_seqnum, old_root_hash)
1296
1297         read_vector = [(0, len(my_checkstring))]
1298
1299         dl = []
1300         # ok, send the messages!
1301         self._surprised = False
1302         dispatch_map = DictOfSets()
1303
1304         for peerid, tw_vectors in peer_messages.items():
1305
1306             write_enabler = self._node.get_write_enabler(peerid)
1307             renew_secret = self._node.get_renewal_secret(peerid)
1308             cancel_secret = self._node.get_cancel_secret(peerid)
1309             secrets = (write_enabler, renew_secret, cancel_secret)
1310
1311             d = self._do_testreadwrite(peerid, secrets,
1312                                        tw_vectors, read_vector)
1313             d.addCallback(self._got_write_answer, tw_vectors, my_checkstring,
1314                           peerid, expected_old_shares[peerid], dispatch_map)
1315             dl.append(d)
1316
1317         d = defer.DeferredList(dl)
1318         d.addCallback(lambda res: (self._surprised, dispatch_map))
1319         return d
1320
1321     def _do_testreadwrite(self, peerid, secrets,
1322                           tw_vectors, read_vector):
1323         storage_index = self._node._uri.storage_index
1324         ss = self._storage_servers[peerid]
1325
1326         d = ss.callRemote("slot_testv_and_readv_and_writev",
1327                           storage_index,
1328                           secrets,
1329                           tw_vectors,
1330                           read_vector)
1331         return d
1332
1333     def _got_write_answer(self, answer, tw_vectors, my_checkstring,
1334                           peerid, expected_old_shares,
1335                           dispatch_map):
1336         lp = self.log("_got_write_answer from %s" %
1337                       idlib.shortnodeid_b2a(peerid))
1338         wrote, read_data = answer
1339         surprised = False
1340
1341         (new_seqnum,new_root_hash,new_IV) = unpack_checkstring(my_checkstring)
1342
1343         if wrote:
1344             for shnum in tw_vectors:
1345                 dispatch_map.add(shnum, (peerid, new_seqnum, new_root_hash))
1346         else:
1347             # surprise! our testv failed, so the write did not happen
1348             self.log("our testv failed, that write did not happen",
1349                      parent=lp, level=log.WEIRD)
1350             surprised = True
1351
1352         for shnum, (old_cs,) in read_data.items():
1353             (old_seqnum, old_root_hash, IV) = unpack_checkstring(old_cs)
1354
1355             if not wrote:
1356                 dispatch_map.add(shnum, (peerid, old_seqnum, old_root_hash))
1357
1358             if shnum not in expected_old_shares:
1359                 # surprise! there was a share we didn't know about
1360                 self.log("they had share %d that we didn't know about" % shnum,
1361                          parent=lp, level=log.WEIRD)
1362                 surprised = True
1363             else:
1364                 seqnum, root_hash = expected_old_shares[shnum]
1365                 if seqnum is not None:
1366                     if seqnum != old_seqnum or root_hash != old_root_hash:
1367                         # surprise! somebody modified the share on us
1368                         self.log("somebody modified the share on us:"
1369                                  " shnum=%d: I thought they had #%d:R=%s,"
1370                                  " but testv reported #%d:R=%s" %
1371                                  (shnum,
1372                                   seqnum, base32.b2a(root_hash)[:4],
1373                                   old_seqnum, base32.b2a(old_root_hash)[:4]),
1374                                  parent=lp, level=log.WEIRD)
1375                         surprised = True
1376         if surprised:
1377             self._surprised = True
1378
1379     def _log_dispatch_map(self, dispatch_map):
1380         for shnum, places in dispatch_map.items():
1381             sent_to = [(idlib.shortnodeid_b2a(peerid),
1382                         seqnum,
1383                         base32.b2a(root_hash)[:4])
1384                        for (peerid,seqnum,root_hash) in places]
1385             self.log(" share %d sent to: %s" % (shnum, sent_to),
1386                      level=log.NOISY)
1387
1388     def _maybe_recover(self, (surprised, dispatch_map)):
1389         self.log("_maybe_recover, surprised=%s, dispatch_map:" % surprised,
1390                  level=log.NOISY)
1391         self._log_dispatch_map(dispatch_map)
1392         if not surprised:
1393             self.log(" no recovery needed")
1394             return
1395         self.log("We need recovery!", level=log.WEIRD)
1396         print "RECOVERY NOT YET IMPLEMENTED"
1397         # but dispatch_map will help us do it
1398         raise UncoordinatedWriteError("I was surprised!")
1399
1400     def _done(self, res):
1401         now = time.time()
1402         self._status.timings["total"] = now - self._started
1403         self._status.set_active(False)
1404         self._status.set_status("Done")
1405         self._status.set_progress(1.0)
1406         return None
1407
1408     def get_status(self):
1409         return self._status
1410
1411
1412 # use client.create_mutable_file() to make one of these
1413
1414 class MutableFileNode:
1415     implements(IMutableFileNode)
1416     publish_class = Publish
1417     retrieve_class = Retrieve
1418     SIGNATURE_KEY_SIZE = 2048
1419
1420     def __init__(self, client):
1421         self._client = client
1422         self._pubkey = None # filled in upon first read
1423         self._privkey = None # filled in if we're mutable
1424         self._required_shares = None # ditto
1425         self._total_shares = None # ditto
1426         self._sharemap = {} # known shares, shnum-to-[nodeids]
1427
1428         self._current_data = None # SDMF: we're allowed to cache the contents
1429         self._current_roothash = None # ditto
1430         self._current_seqnum = None # ditto
1431
1432     def __repr__(self):
1433         return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', hasattr(self, '_uri') and self._uri.abbrev())
1434
1435     def init_from_uri(self, myuri):
1436         # we have the URI, but we have not yet retrieved the public
1437         # verification key, nor things like 'k' or 'N'. If and when someone
1438         # wants to get our contents, we'll pull from shares and fill those
1439         # in.
1440         self._uri = IMutableFileURI(myuri)
1441         if not self._uri.is_readonly():
1442             self._writekey = self._uri.writekey
1443         self._readkey = self._uri.readkey
1444         self._storage_index = self._uri.storage_index
1445         self._fingerprint = self._uri.fingerprint
1446         # the following values are learned during Retrieval
1447         #  self._pubkey
1448         #  self._required_shares
1449         #  self._total_shares
1450         # and these are needed for Publish. They are filled in by Retrieval
1451         # if possible, otherwise by the first peer that Publish talks to.
1452         self._privkey = None
1453         self._encprivkey = None
1454         return self
1455
1456     def create(self, initial_contents):
1457         """Call this when the filenode is first created. This will generate
1458         the keys, generate the initial shares, wait until at least numpeers
1459         are connected, allocate shares, and upload the initial
1460         contents. Returns a Deferred that fires (with the MutableFileNode
1461         instance you should use) when it completes.
1462         """
1463         self._required_shares = 3
1464         self._total_shares = 10
1465         d = defer.maybeDeferred(self._generate_pubprivkeys)
1466         def _generated( (pubkey, privkey) ):
1467             self._pubkey, self._privkey = pubkey, privkey
1468             pubkey_s = self._pubkey.serialize()
1469             privkey_s = self._privkey.serialize()
1470             self._writekey = hashutil.ssk_writekey_hash(privkey_s)
1471             self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
1472             self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
1473             self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
1474             self._readkey = self._uri.readkey
1475             self._storage_index = self._uri.storage_index
1476             # TODO: seqnum/roothash: really we mean "doesn't matter since
1477             # nobody knows about us yet"
1478             self._current_seqnum = 0
1479             self._current_roothash = "\x00"*32
1480             return self._publish(initial_contents)
1481         d.addCallback(_generated)
1482         return d
1483
1484     def _generate_pubprivkeys(self):
1485         # RSA key generation for a 2048 bit key takes between 0.8 and 3.2 secs
1486         signer = rsa.generate(self.SIGNATURE_KEY_SIZE)
1487         verifier = signer.get_verifying_key()
1488         return verifier, signer
1489
1490     def _publish(self, initial_contents):
1491         p = self.publish_class(self)
1492         self._client.notify_publish(p)
1493         d = p.publish(initial_contents)
1494         d.addCallback(lambda res: self)
1495         return d
1496
1497     def _encrypt_privkey(self, writekey, privkey):
1498         enc = AES(writekey)
1499         crypttext = enc.process(privkey)
1500         return crypttext
1501
1502     def _decrypt_privkey(self, enc_privkey):
1503         enc = AES(self._writekey)
1504         privkey = enc.process(enc_privkey)
1505         return privkey
1506
1507     def _populate(self, stuff):
1508         # the Retrieval object calls this with values it discovers when
1509         # downloading the slot. This is how a MutableFileNode that was
1510         # created from a URI learns about its full key.
1511         pass
1512
1513     def _populate_pubkey(self, pubkey):
1514         self._pubkey = pubkey
1515     def _populate_required_shares(self, required_shares):
1516         self._required_shares = required_shares
1517     def _populate_total_shares(self, total_shares):
1518         self._total_shares = total_shares
1519     def _populate_seqnum(self, seqnum):
1520         self._current_seqnum = seqnum
1521     def _populate_root_hash(self, root_hash):
1522         self._current_roothash = root_hash
1523
1524     def _populate_privkey(self, privkey):
1525         self._privkey = privkey
1526     def _populate_encprivkey(self, encprivkey):
1527         self._encprivkey = encprivkey
1528
1529
1530     def get_write_enabler(self, peerid):
1531         assert len(peerid) == 20
1532         return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
1533     def get_renewal_secret(self, peerid):
1534         assert len(peerid) == 20
1535         crs = self._client.get_renewal_secret()
1536         frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
1537         return hashutil.bucket_renewal_secret_hash(frs, peerid)
1538     def get_cancel_secret(self, peerid):
1539         assert len(peerid) == 20
1540         ccs = self._client.get_cancel_secret()
1541         fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
1542         return hashutil.bucket_cancel_secret_hash(fcs, peerid)
1543
1544     def get_writekey(self):
1545         return self._writekey
1546     def get_readkey(self):
1547         return self._readkey
1548     def get_storage_index(self):
1549         return self._storage_index
1550     def get_privkey(self):
1551         return self._privkey
1552     def get_encprivkey(self):
1553         return self._encprivkey
1554     def get_pubkey(self):
1555         return self._pubkey
1556
1557     def get_required_shares(self):
1558         return self._required_shares
1559     def get_total_shares(self):
1560         return self._total_shares
1561
1562
1563     def get_uri(self):
1564         return self._uri.to_string()
1565     def get_size(self):
1566         return "?" # TODO: this is likely to cause problems, not being an int
1567     def get_readonly(self):
1568         if self.is_readonly():
1569             return self
1570         ro = MutableFileNode(self._client)
1571         ro.init_from_uri(self._uri.get_readonly())
1572         return ro
1573
1574     def get_readonly_uri(self):
1575         return self._uri.get_readonly().to_string()
1576
1577     def is_mutable(self):
1578         return self._uri.is_mutable()
1579     def is_readonly(self):
1580         return self._uri.is_readonly()
1581
1582     def __hash__(self):
1583         return hash((self.__class__, self.uri))
1584     def __cmp__(self, them):
1585         if cmp(type(self), type(them)):
1586             return cmp(type(self), type(them))
1587         if cmp(self.__class__, them.__class__):
1588             return cmp(self.__class__, them.__class__)
1589         return cmp(self.uri, them.uri)
1590
1591     def get_verifier(self):
1592         return IMutableFileURI(self._uri).get_verifier()
1593
1594     def check(self):
1595         verifier = self.get_verifier()
1596         return self._client.getServiceNamed("checker").check(verifier)
1597
1598     def download(self, target):
1599         # fake it. TODO: make this cleaner.
1600         d = self.download_to_data()
1601         def _done(data):
1602             target.open(len(data))
1603             target.write(data)
1604             target.close()
1605             return target.finish()
1606         d.addCallback(_done)
1607         return d
1608
1609     def download_to_data(self):
1610         r = self.retrieve_class(self)
1611         self._client.notify_retrieve(r)
1612         return r.retrieve()
1613
1614     def replace(self, newdata):
1615         r = self.retrieve_class(self)
1616         self._client.notify_retrieve(r)
1617         d = r.retrieve()
1618         d.addCallback(lambda res: self._publish(newdata))
1619         return d
1620
1621 class MutableWatcher(service.MultiService):
1622     MAX_PUBLISH_STATUSES = 20
1623     MAX_RETRIEVE_STATUSES = 20
1624     name = "mutable-watcher"
1625
1626     def __init__(self):
1627         service.MultiService.__init__(self)
1628         self._all_publish = weakref.WeakKeyDictionary()
1629         self._recent_publish_status = []
1630         self._all_retrieve = weakref.WeakKeyDictionary()
1631         self._recent_retrieve_status = []
1632
1633     def notify_publish(self, p):
1634         self._all_publish[p] = None
1635         self._recent_publish_status.append(p.get_status())
1636         while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
1637             self._recent_publish_status.pop(0)
1638
1639     def list_all_publish(self):
1640         return self._all_publish.keys()
1641     def list_active_publish(self):
1642         return [p.get_status() for p in self._all_publish.keys()
1643                 if p.get_status().get_active()]
1644     def list_recent_publish(self):
1645         return self._recent_publish_status
1646
1647
1648     def notify_retrieve(self, r):
1649         self._all_retrieve[r] = None
1650         self._recent_retrieve_status.append(r.get_status())
1651         while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
1652             self._recent_retrieve_status.pop(0)
1653
1654     def list_all_retrieve(self):
1655         return self._all_retrieve.keys()
1656     def list_active_retrieve(self):
1657         return [p.get_status() for p in self._all_retrieve.keys()
1658                 if p.get_status().get_active()]
1659     def list_recent_retrieve(self):
1660         return self._recent_retrieve_status