3 import os, struct, time
4 from itertools import count
5 from zope.interface import implements
6 from twisted.internet import defer
7 from twisted.python import failure
8 from allmydata.interfaces import IPublishStatus
9 from allmydata.util import base32, hashutil, mathutil, idlib, log
10 from allmydata import hashtree, codec, storage
11 from pycryptopp.cipher.aes import AES
12 from foolscap.eventual import eventually
14 from common import MODE_WRITE, UncoordinatedWriteError, DictOfSets
15 from servermap import ServerMap
16 from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
17 unpack_checkstring, SIGNED_PREFIX
20 implements(IPublishStatus)
21 statusid_counter = count(0)
24 self.timings["send_per_server"] = {}
28 self.storage_index = None
30 self.encoding = ("?", "?")
32 self.status = "Not started"
34 self.counter = self.statusid_counter.next()
35 self.started = time.time()
37 def add_per_server_time(self, peerid, elapsed):
38 if peerid not in self.timings["send_per_server"]:
39 self.timings["send_per_server"][peerid] = []
40 self.timings["send_per_server"][peerid].append(elapsed)
42 def get_started(self):
44 def get_storage_index(self):
45 return self.storage_index
46 def get_encoding(self):
48 def using_helper(self):
50 def get_servermap(self):
56 def get_progress(self):
60 def get_counter(self):
63 def set_storage_index(self, si):
64 self.storage_index = si
65 def set_helper(self, helper):
67 def set_servermap(self, servermap):
68 self.servermap = servermap
69 def set_encoding(self, k, n):
70 self.encoding = (k, n)
71 def set_size(self, size):
73 def set_status(self, status):
75 def set_progress(self, value):
77 def set_active(self, value):
81 """I represent a single act of publishing the mutable file to the grid. I
82 will only publish my data if the servermap I am using still represents
83 the current state of the world.
85 To make the initial publish, set servermap to None.
88 # we limit the segment size as usual to constrain our memory footprint.
89 # The max segsize is higher for mutable files, because we want to support
90 # dirnodes with up to 10k children, and each child uses about 330 bytes.
91 # If you actually put that much into a directory you'll be using a
92 # footprint of around 14MB, which is higher than we'd like, but it is
93 # more important right now to support large directories than to make
94 # memory usage small when you use them. Once we implement MDMF (with
95 # multiple segments), we will drop this back down, probably to 128KiB.
96 MAX_SEGMENT_SIZE = 3500000
98 def __init__(self, filenode, servermap):
100 self._servermap = servermap
101 self._storage_index = self._node.get_storage_index()
102 self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
103 num = self._node._client.log("Publish(%s): starting" % prefix)
104 self._log_number = num
107 self._status = PublishStatus()
108 self._status.set_storage_index(self._storage_index)
109 self._status.set_helper(False)
110 self._status.set_progress(0.0)
111 self._status.set_active(True)
112 self._status.set_servermap(servermap)
114 def log(self, *args, **kwargs):
115 if 'parent' not in kwargs:
116 kwargs['parent'] = self._log_number
117 return log.msg(*args, **kwargs)
119 def log_err(self, *args, **kwargs):
120 if 'parent' not in kwargs:
121 kwargs['parent'] = self._log_number
122 return log.err(*args, **kwargs)
124 def publish(self, newdata):
125 """Publish the filenode's current contents. Returns a Deferred that
126 fires (with None) when the publish has done as much work as it's ever
127 going to do, or errbacks with ConsistencyError if it detects a
131 # 1: generate shares (SDMF: files are small, so we can do it in RAM)
132 # 2: perform peer selection, get candidate servers
133 # 2a: send queries to n+epsilon servers, to determine current shares
134 # 2b: based upon responses, create target map
135 # 3: send slot_testv_and_readv_and_writev messages
136 # 4: as responses return, update share-dispatch table
137 # 4a: may need to run recovery algorithm
138 # 5: when enough responses are back, we're done
140 self.log("starting publish, datalen is %s" % len(newdata))
141 self._status.set_size(len(newdata))
142 self._status.set_status("Started")
143 self._started = time.time()
145 self.done_deferred = defer.Deferred()
147 self._writekey = self._node.get_writekey()
148 assert self._writekey, "need write capability to publish"
150 # first, which servers will we publish to? We require that the
151 # servermap was updated in MODE_WRITE, so we can depend upon the
152 # peerlist computed by that process instead of computing our own.
154 assert self._servermap.last_update_mode == MODE_WRITE
155 # we will push a version that is one larger than anything present
156 # in the grid, according to the servermap.
157 self._new_seqnum = self._servermap.highest_seqnum() + 1
159 # If we don't have a servermap, that's because we're doing the
162 self._servermap = ServerMap()
164 self.log(format="new seqnum will be %(seqnum)d",
165 seqnum=self._new_seqnum, level=log.NOISY)
167 # having an up-to-date servermap (or using a filenode that was just
168 # created for the first time) also guarantees that the following
169 # fields are available
170 self.readkey = self._node.get_readkey()
171 self.required_shares = self._node.get_required_shares()
172 assert self.required_shares is not None
173 self.total_shares = self._node.get_total_shares()
174 assert self.total_shares is not None
175 self._status.set_encoding(self.required_shares, self.total_shares)
177 self._pubkey = self._node.get_pubkey()
179 self._privkey = self._node.get_privkey()
181 self._encprivkey = self._node.get_encprivkey()
183 client = self._node._client
184 full_peerlist = client.get_permuted_peers("storage",
186 self.full_peerlist = full_peerlist # for use later, immutable
187 self.bad_peers = set() # peerids who have errbacked/refused requests
189 self.newdata = newdata
190 self.salt = os.urandom(16)
192 self.setup_encoding_parameters()
194 self.surprised = False
196 # we keep track of three tables. The first is our goal: which share
197 # we want to see on which servers. This is initially populated by the
198 # existing servermap.
199 self.goal = set() # pairs of (peerid, shnum) tuples
201 # the second table is our list of outstanding queries: those which
202 # are in flight and may or may not be delivered, accepted, or
203 # acknowledged. Items are added to this table when the request is
204 # sent, and removed when the response returns (or errbacks).
205 self.outstanding = set() # (peerid, shnum) tuples
207 # the third is a table of successes: share which have actually been
208 # placed. These are populated when responses come back with success.
209 # When self.placed == self.goal, we're done.
210 self.placed = set() # (peerid, shnum) tuples
212 # we also keep a mapping from peerid to RemoteReference. Each time we
213 # pull a connection out of the full peerlist, we add it to this for
215 self.connections = {}
217 # we use the servermap to populate the initial goal: this way we will
218 # try to update each existing share in place.
219 for (peerid, shnum) in self._servermap.servermap:
220 self.goal.add( (peerid, shnum) )
221 self.connections[peerid] = self._servermap.connections[peerid]
223 # create the shares. We'll discard these as they are delivered. SMDF:
224 # we're allowed to hold everything in memory.
226 self._status.timings["setup"] = time.time() - self._started
227 d = self._encrypt_and_encode()
228 d.addCallback(self._generate_shares)
229 def _start_pushing(res):
230 self._started_pushing = time.time()
232 d.addCallback(_start_pushing)
233 d.addCallback(self.loop) # trigger delivery
234 d.addErrback(self._fatal_error)
236 return self.done_deferred
238 def setup_encoding_parameters(self):
239 segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
240 # this must be a multiple of self.required_shares
241 segment_size = mathutil.next_multiple(segment_size,
242 self.required_shares)
243 self.segment_size = segment_size
245 self.num_segments = mathutil.div_ceil(len(self.newdata),
248 self.num_segments = 0
249 assert self.num_segments in [0, 1,] # SDMF restrictions
251 def _fatal_error(self, f):
252 self.log("error during loop", failure=f, level=log.SCARY)
255 def _update_status(self):
256 self._status.set_status("Sending Shares: %d placed out of %d, "
257 "%d messages outstanding" %
260 len(self.outstanding)))
261 self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
263 def loop(self, ignored=None):
264 self.log("entering loop", level=log.NOISY)
265 if not self._running:
268 # how far are we from our goal?
269 needed = self.goal - self.placed - self.outstanding
270 self._update_status()
273 # we need to send out new shares
274 self.log(format="need to send %(needed)d new shares",
275 needed=len(needed), level=log.NOISY)
276 d = self._send_shares(needed)
277 d.addCallback(self.loop)
278 d.addErrback(self._fatal_error)
282 # queries are still pending, keep waiting
283 self.log(format="%(outstanding)d queries still outstanding",
284 outstanding=len(self.outstanding),
288 # no queries outstanding, no placements needed: we're done
289 self.log("no queries outstanding, no placements needed: done",
290 level=log.OPERATIONAL)
292 elapsed = now - self._started_pushing
293 self._status.timings["push"] = elapsed
294 return self._done(None)
296 def log_goal(self, goal):
298 for (peerid, shnum) in goal:
299 logmsg.append("sh%d to [%s]" % (shnum,
300 idlib.shortnodeid_b2a(peerid)))
301 self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
302 self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
305 def update_goal(self):
306 # first, remove any bad peers from our goal
307 self.goal = set([ (peerid, shnum)
308 for (peerid, shnum) in self.goal
309 if peerid not in self.bad_peers ])
311 # find the homeless shares:
312 homefull_shares = set([shnum for (peerid, shnum) in self.goal])
313 homeless_shares = set(range(self.total_shares)) - homefull_shares
314 homeless_shares = sorted(list(homeless_shares))
315 # place them somewhere. We prefer unused servers at the beginning of
316 # the available peer list.
318 if not homeless_shares:
321 # if log.recording_noisy
323 self.log_goal(self.goal)
325 # if an old share X is on a node, put the new share X there too.
326 # TODO: 1: redistribute shares to achieve one-per-peer, by copying
327 # shares from existing peers to new (less-crowded) ones. The
328 # old shares must still be updated.
329 # TODO: 2: move those shares instead of copying them, to reduce future
332 # this is a bit CPU intensive but easy to analyze. We create a sort
333 # order for each peerid. If the peerid is marked as bad, we don't
334 # even put them in the list. Then we care about the number of shares
335 # which have already been assigned to them. After that we care about
336 # their permutation order.
337 old_assignments = DictOfSets()
338 for (peerid, shnum) in self.goal:
339 old_assignments.add(peerid, shnum)
342 for i, (peerid, ss) in enumerate(self.full_peerlist):
343 entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
344 peerlist.append(entry)
348 # we then index this peerlist with an integer, because we may have to
349 # wrap. We update the goal as we go.
351 for shnum in homeless_shares:
352 (ignored1, ignored2, peerid, ss) = peerlist[i]
353 self.goal.add( (peerid, shnum) )
354 self.connections[peerid] = ss
356 if i >= len(peerlist):
361 def _encrypt_and_encode(self):
362 # this returns a Deferred that fires with a list of (sharedata,
363 # sharenum) tuples. TODO: cache the ciphertext, only produce the
364 # shares that we care about.
365 self.log("_encrypt_and_encode")
367 self._status.set_status("Encrypting")
368 started = time.time()
370 key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
372 crypttext = enc.process(self.newdata)
373 assert len(crypttext) == len(self.newdata)
376 self._status.timings["encrypt"] = now - started
381 self._status.set_status("Encoding")
382 fec = codec.CRSEncoder()
383 fec.set_params(self.segment_size,
384 self.required_shares, self.total_shares)
385 piece_size = fec.get_block_size()
386 crypttext_pieces = [None] * self.required_shares
387 for i in range(len(crypttext_pieces)):
388 offset = i * piece_size
389 piece = crypttext[offset:offset+piece_size]
390 piece = piece + "\x00"*(piece_size - len(piece)) # padding
391 crypttext_pieces[i] = piece
392 assert len(piece) == piece_size
394 d = fec.encode(crypttext_pieces)
395 def _done_encoding(res):
396 elapsed = time.time() - started
397 self._status.timings["encode"] = elapsed
399 d.addCallback(_done_encoding)
402 def _generate_shares(self, shares_and_shareids):
403 # this sets self.shares and self.root_hash
404 self.log("_generate_shares")
405 self._status.set_status("Generating Shares")
406 started = time.time()
408 # we should know these by now
409 privkey = self._privkey
410 encprivkey = self._encprivkey
411 pubkey = self._pubkey
413 (shares, share_ids) = shares_and_shareids
415 assert len(shares) == len(share_ids)
416 assert len(shares) == self.total_shares
418 block_hash_trees = {}
419 share_hash_leaves = [None] * len(shares)
420 for i in range(len(shares)):
421 share_data = shares[i]
423 all_shares[shnum] = share_data
425 # build the block hash tree. SDMF has only one leaf.
426 leaves = [hashutil.block_hash(share_data)]
427 t = hashtree.HashTree(leaves)
428 block_hash_trees[shnum] = block_hash_tree = list(t)
429 share_hash_leaves[shnum] = t[0]
430 for leaf in share_hash_leaves:
431 assert leaf is not None
432 share_hash_tree = hashtree.HashTree(share_hash_leaves)
433 share_hash_chain = {}
434 for shnum in range(self.total_shares):
435 needed_hashes = share_hash_tree.needed_hashes(shnum)
436 share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
437 for i in needed_hashes ] )
438 root_hash = share_hash_tree[0]
439 assert len(root_hash) == 32
440 self.log("my new root_hash is %s" % base32.b2a(root_hash))
442 prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
443 self.required_shares, self.total_shares,
444 self.segment_size, len(self.newdata))
446 # now pack the beginning of the share. All shares are the same up
447 # to the signature, then they have divergent share hash chains,
448 # then completely different block hash trees + salt + share data,
449 # then they all share the same encprivkey at the end. The sizes
450 # of everything are the same for all shares.
452 sign_started = time.time()
453 signature = privkey.sign(prefix)
454 self._status.timings["sign"] = time.time() - sign_started
456 verification_key = pubkey.serialize()
459 for shnum in range(self.total_shares):
460 final_share = pack_share(prefix,
463 share_hash_chain[shnum],
464 block_hash_trees[shnum],
467 final_shares[shnum] = final_share
468 elapsed = time.time() - started
469 self._status.timings["pack"] = elapsed
470 self.shares = final_shares
471 self.root_hash = root_hash
473 # we also need to build up the version identifier for what we're
474 # pushing. Extract the offsets from one of our shares.
476 offsets = unpack_header(final_shares.values()[0])[-1]
477 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
478 verinfo = (self._new_seqnum, root_hash, self.salt,
479 self.segment_size, len(self.newdata),
480 self.required_shares, self.total_shares,
481 prefix, offsets_tuple)
482 self.versioninfo = verinfo
486 def _send_shares(self, needed):
487 self.log("_send_shares")
489 # we're finally ready to send out our shares. If we encounter any
490 # surprises here, it's because somebody else is writing at the same
491 # time. (Note: in the future, when we remove the _query_peers() step
492 # and instead speculate about [or remember] which shares are where,
493 # surprises here are *not* indications of UncoordinatedWriteError,
494 # and we'll need to respond to them more gracefully.)
496 # needed is a set of (peerid, shnum) tuples. The first thing we do is
497 # organize it by peerid.
499 peermap = DictOfSets()
500 for (peerid, shnum) in needed:
501 peermap.add(peerid, shnum)
503 # the next thing is to build up a bunch of test vectors. The
504 # semantics of Publish are that we perform the operation if the world
505 # hasn't changed since the ServerMap was constructed (more or less).
506 # For every share we're trying to place, we create a test vector that
507 # tests to see if the server*share still corresponds to the
510 all_tw_vectors = {} # maps peerid to tw_vectors
511 sm = self._servermap.servermap
514 (peerid, shnum) = key
517 # an old version of that share already exists on the
518 # server, according to our servermap. We will create a
519 # request that attempts to replace it.
520 old_versionid, old_timestamp = sm[key]
521 (old_seqnum, old_root_hash, old_salt, old_segsize,
522 old_datalength, old_k, old_N, old_prefix,
523 old_offsets_tuple) = old_versionid
524 old_checkstring = pack_checkstring(old_seqnum,
527 testv = (0, len(old_checkstring), "eq", old_checkstring)
530 # add a testv that requires the share not exist
531 #testv = (0, 1, 'eq', "")
533 # Unfortunately, foolscap-0.2.5 has a bug in the way inbound
534 # constraints are handled. If the same object is referenced
535 # multiple times inside the arguments, foolscap emits a
536 # 'reference' token instead of a distinct copy of the
537 # argument. The bug is that these 'reference' tokens are not
538 # accepted by the inbound constraint code. To work around
539 # this, we need to prevent python from interning the
540 # (constant) tuple, by creating a new copy of this vector
541 # each time. This bug is fixed in later versions of foolscap.
542 testv = tuple([0, 1, 'eq', ""])
545 # the write vector is simply the share
546 writev = [(0, self.shares[shnum])]
548 if peerid not in all_tw_vectors:
549 all_tw_vectors[peerid] = {}
550 # maps shnum to (testvs, writevs, new_length)
551 assert shnum not in all_tw_vectors[peerid]
553 all_tw_vectors[peerid][shnum] = (testvs, writev, None)
555 # we read the checkstring back from each share, however we only use
556 # it to detect whether there was a new share that we didn't know
557 # about. The success or failure of the write will tell us whether
558 # there was a collision or not. If there is a collision, the first
559 # thing we'll do is update the servermap, which will find out what
560 # happened. We could conceivably reduce a roundtrip by using the
561 # readv checkstring to populate the servermap, but really we'd have
562 # to read enough data to validate the signatures too, so it wouldn't
564 read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
566 # ok, send the messages!
567 started = time.time()
569 for (peerid, tw_vectors) in all_tw_vectors.items():
571 write_enabler = self._node.get_write_enabler(peerid)
572 renew_secret = self._node.get_renewal_secret(peerid)
573 cancel_secret = self._node.get_cancel_secret(peerid)
574 secrets = (write_enabler, renew_secret, cancel_secret)
575 shnums = tw_vectors.keys()
577 d = self._do_testreadwrite(peerid, secrets,
578 tw_vectors, read_vector)
579 d.addCallbacks(self._got_write_answer, self._got_write_error,
580 callbackArgs=(peerid, shnums, started),
581 errbackArgs=(peerid, shnums, started))
582 d.addErrback(self._fatal_error)
585 self._update_status()
586 return defer.DeferredList(dl) # purely for testing
588 def _do_testreadwrite(self, peerid, secrets,
589 tw_vectors, read_vector):
590 storage_index = self._storage_index
591 ss = self.connections[peerid]
593 #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
594 d = ss.callRemote("slot_testv_and_readv_and_writev",
601 def _got_write_answer(self, answer, peerid, shnums, started):
602 lp = self.log("_got_write_answer from %s" %
603 idlib.shortnodeid_b2a(peerid))
605 self.outstanding.discard( (peerid, shnum) )
608 elapsed = now - started
609 self._status.add_per_server_time(peerid, elapsed)
611 wrote, read_data = answer
614 self.log("our testv failed, so the write did not happen",
615 parent=lp, level=log.WEIRD)
616 self.surprised = True
617 self.bad_peers.add(peerid) # don't ask them again
618 # use the checkstring to add information to the log message
619 for (shnum,readv) in read_data.items():
620 checkstring = readv[0]
623 other_salt) = unpack_checkstring(checkstring)
624 expected_version = self._servermap.version_on_peer(peerid,
626 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
627 offsets_tuple) = expected_version
628 self.log("somebody modified the share on us:"
629 " shnum=%d: I thought they had #%d:R=%s,"
630 " but testv reported #%d:R=%s" %
632 seqnum, base32.b2a(root_hash)[:4],
633 other_seqnum, base32.b2a(other_roothash)[:4]),
634 parent=lp, level=log.NOISY)
635 # self.loop() will take care of finding new homes
639 self.placed.add( (peerid, shnum) )
640 # and update the servermap
641 self._servermap.add_new_share(peerid, shnum,
642 self.versioninfo, started)
644 surprise_shares = set(read_data.keys()) - set(shnums)
646 self.log("they had shares %s that we didn't know about" %
647 (list(surprise_shares),),
648 parent=lp, level=log.WEIRD)
649 self.surprised = True
652 # self.loop() will take care of checking to see if we're done
655 def _got_write_error(self, f, peerid, shnums, started):
657 self.outstanding.discard( (peerid, shnum) )
658 self.bad_peers.add(peerid)
659 self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
660 shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
663 # self.loop() will take care of checking to see if we're done
668 def _log_dispatch_map(self, dispatch_map):
669 for shnum, places in dispatch_map.items():
670 sent_to = [(idlib.shortnodeid_b2a(peerid),
672 base32.b2a(root_hash)[:4])
673 for (peerid,seqnum,root_hash) in places]
674 self.log(" share %d sent to: %s" % (shnum, sent_to),
677 def _maybe_recover(self, (surprised, dispatch_map)):
678 self.log("_maybe_recover, surprised=%s, dispatch_map:" % surprised,
680 self._log_dispatch_map(dispatch_map)
682 self.log(" no recovery needed")
684 self.log("We need recovery!", level=log.WEIRD)
685 print "RECOVERY NOT YET IMPLEMENTED"
686 # but dispatch_map will help us do it
687 raise UncoordinatedWriteError("I was surprised!")
689 def _done(self, res):
690 if not self._running:
692 self._running = False
694 self._status.timings["total"] = now - self._started
695 self._status.set_active(False)
696 if isinstance(res, failure.Failure):
697 self.log("Retrieve done, with failure", failure=res)
698 self._status.set_status("Failed")
700 self._status.set_status("Done")
701 self._status.set_progress(1.0)
702 eventually(self.done_deferred.callback, res)
704 def get_status(self):