3 import os, struct, time
4 from itertools import count
5 from zope.interface import implements
6 from twisted.internet import defer
7 from twisted.python import failure
8 from allmydata.interfaces import IPublishStatus
9 from allmydata.util import base32, hashutil, mathutil, idlib, log
10 from allmydata import hashtree, codec
11 from allmydata.storage.server import si_b2a
12 from pycryptopp.cipher.aes import AES
13 from foolscap.api import eventually, fireEventually
15 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, DictOfSets, \
16 UncoordinatedWriteError, NotEnoughServersError
17 from allmydata.mutable.servermap import ServerMap
18 from allmydata.mutable.layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
19 unpack_checkstring, SIGNED_PREFIX
22 implements(IPublishStatus)
23 statusid_counter = count(0)
26 self.timings["send_per_server"] = {}
30 self.storage_index = None
32 self.encoding = ("?", "?")
34 self.status = "Not started"
36 self.counter = self.statusid_counter.next()
37 self.started = time.time()
39 def add_per_server_time(self, peerid, elapsed):
40 if peerid not in self.timings["send_per_server"]:
41 self.timings["send_per_server"][peerid] = []
42 self.timings["send_per_server"][peerid].append(elapsed)
44 def get_started(self):
46 def get_storage_index(self):
47 return self.storage_index
48 def get_encoding(self):
50 def using_helper(self):
52 def get_servermap(self):
58 def get_progress(self):
62 def get_counter(self):
65 def set_storage_index(self, si):
66 self.storage_index = si
67 def set_helper(self, helper):
69 def set_servermap(self, servermap):
70 self.servermap = servermap
71 def set_encoding(self, k, n):
72 self.encoding = (k, n)
73 def set_size(self, size):
75 def set_status(self, status):
77 def set_progress(self, value):
79 def set_active(self, value):
82 class LoopLimitExceededError(Exception):
86 """I represent a single act of publishing the mutable file to the grid. I
87 will only publish my data if the servermap I am using still represents
88 the current state of the world.
90 To make the initial publish, set servermap to None.
93 def __init__(self, filenode, storage_broker, servermap):
95 self._storage_broker = storage_broker
96 self._servermap = servermap
97 self._storage_index = self._node.get_storage_index()
98 self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
99 num = self.log("Publish(%s): starting" % prefix, parent=None)
100 self._log_number = num
102 self._first_write_error = None
104 self._status = PublishStatus()
105 self._status.set_storage_index(self._storage_index)
106 self._status.set_helper(False)
107 self._status.set_progress(0.0)
108 self._status.set_active(True)
110 def get_status(self):
113 def log(self, *args, **kwargs):
114 if 'parent' not in kwargs:
115 kwargs['parent'] = self._log_number
116 if "facility" not in kwargs:
117 kwargs["facility"] = "tahoe.mutable.publish"
118 return log.msg(*args, **kwargs)
120 def publish(self, newdata):
121 """Publish the filenode's current contents. Returns a Deferred that
122 fires (with None) when the publish has done as much work as it's ever
123 going to do, or errbacks with ConsistencyError if it detects a
127 # 1: generate shares (SDMF: files are small, so we can do it in RAM)
128 # 2: perform peer selection, get candidate servers
129 # 2a: send queries to n+epsilon servers, to determine current shares
130 # 2b: based upon responses, create target map
131 # 3: send slot_testv_and_readv_and_writev messages
132 # 4: as responses return, update share-dispatch table
133 # 4a: may need to run recovery algorithm
134 # 5: when enough responses are back, we're done
136 self.log("starting publish, datalen is %s" % len(newdata))
137 self._status.set_size(len(newdata))
138 self._status.set_status("Started")
139 self._started = time.time()
141 self.done_deferred = defer.Deferred()
143 self._writekey = self._node.get_writekey()
144 assert self._writekey, "need write capability to publish"
146 # first, which servers will we publish to? We require that the
147 # servermap was updated in MODE_WRITE, so we can depend upon the
148 # peerlist computed by that process instead of computing our own.
150 assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
151 # we will push a version that is one larger than anything present
152 # in the grid, according to the servermap.
153 self._new_seqnum = self._servermap.highest_seqnum() + 1
155 # If we don't have a servermap, that's because we're doing the
158 self._servermap = ServerMap()
159 self._status.set_servermap(self._servermap)
161 self.log(format="new seqnum will be %(seqnum)d",
162 seqnum=self._new_seqnum, level=log.NOISY)
164 # having an up-to-date servermap (or using a filenode that was just
165 # created for the first time) also guarantees that the following
166 # fields are available
167 self.readkey = self._node.get_readkey()
168 self.required_shares = self._node.get_required_shares()
169 assert self.required_shares is not None
170 self.total_shares = self._node.get_total_shares()
171 assert self.total_shares is not None
172 self._status.set_encoding(self.required_shares, self.total_shares)
174 self._pubkey = self._node.get_pubkey()
176 self._privkey = self._node.get_privkey()
178 self._encprivkey = self._node.get_encprivkey()
180 sb = self._storage_broker
181 full_peerlist = sb.get_servers_for_index(self._storage_index)
182 self.full_peerlist = full_peerlist # for use later, immutable
183 self.bad_peers = set() # peerids who have errbacked/refused requests
185 self.newdata = newdata
186 self.salt = os.urandom(16)
188 self.setup_encoding_parameters()
190 # if we experience any surprises (writes which were rejected because
191 # our test vector did not match, or shares which we didn't expect to
192 # see), we set this flag and report an UncoordinatedWriteError at the
193 # end of the publish process.
194 self.surprised = False
196 # as a failsafe, refuse to iterate through self.loop more than a
198 self.looplimit = 1000
200 # we keep track of three tables. The first is our goal: which share
201 # we want to see on which servers. This is initially populated by the
202 # existing servermap.
203 self.goal = set() # pairs of (peerid, shnum) tuples
205 # the second table is our list of outstanding queries: those which
206 # are in flight and may or may not be delivered, accepted, or
207 # acknowledged. Items are added to this table when the request is
208 # sent, and removed when the response returns (or errbacks).
209 self.outstanding = set() # (peerid, shnum) tuples
211 # the third is a table of successes: share which have actually been
212 # placed. These are populated when responses come back with success.
213 # When self.placed == self.goal, we're done.
214 self.placed = set() # (peerid, shnum) tuples
216 # we also keep a mapping from peerid to RemoteReference. Each time we
217 # pull a connection out of the full peerlist, we add it to this for
219 self.connections = {}
221 self.bad_share_checkstrings = {}
223 # we use the servermap to populate the initial goal: this way we will
224 # try to update each existing share in place.
225 for (peerid, shnum) in self._servermap.servermap:
226 self.goal.add( (peerid, shnum) )
227 self.connections[peerid] = self._servermap.connections[peerid]
228 # then we add in all the shares that were bad (corrupted, bad
229 # signatures, etc). We want to replace these.
230 for key, old_checkstring in self._servermap.bad_shares.items():
231 (peerid, shnum) = key
233 self.bad_share_checkstrings[key] = old_checkstring
234 self.connections[peerid] = self._servermap.connections[peerid]
236 # create the shares. We'll discard these as they are delivered. SDMF:
237 # we're allowed to hold everything in memory.
239 self._status.timings["setup"] = time.time() - self._started
240 d = self._encrypt_and_encode()
241 d.addCallback(self._generate_shares)
242 def _start_pushing(res):
243 self._started_pushing = time.time()
245 d.addCallback(_start_pushing)
246 d.addCallback(self.loop) # trigger delivery
247 d.addErrback(self._fatal_error)
249 return self.done_deferred
251 def setup_encoding_parameters(self):
252 segment_size = len(self.newdata)
253 # this must be a multiple of self.required_shares
254 segment_size = mathutil.next_multiple(segment_size,
255 self.required_shares)
256 self.segment_size = segment_size
258 self.num_segments = mathutil.div_ceil(len(self.newdata),
261 self.num_segments = 0
262 assert self.num_segments in [0, 1,] # SDMF restrictions
264 def _fatal_error(self, f):
265 self.log("error during loop", failure=f, level=log.UNUSUAL)
268 def _update_status(self):
269 self._status.set_status("Sending Shares: %d placed out of %d, "
270 "%d messages outstanding" %
273 len(self.outstanding)))
274 self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
276 def loop(self, ignored=None):
277 self.log("entering loop", level=log.NOISY)
278 if not self._running:
282 if self.looplimit <= 0:
283 raise LoopLimitExceededError("loop limit exceeded")
286 # don't send out any new shares, just wait for the outstanding
287 # ones to be retired.
288 self.log("currently surprised, so don't send any new shares",
292 # how far are we from our goal?
293 needed = self.goal - self.placed - self.outstanding
294 self._update_status()
297 # we need to send out new shares
298 self.log(format="need to send %(needed)d new shares",
299 needed=len(needed), level=log.NOISY)
300 self._send_shares(needed)
304 # queries are still pending, keep waiting
305 self.log(format="%(outstanding)d queries still outstanding",
306 outstanding=len(self.outstanding),
310 # no queries outstanding, no placements needed: we're done
311 self.log("no queries outstanding, no placements needed: done",
312 level=log.OPERATIONAL)
314 elapsed = now - self._started_pushing
315 self._status.timings["push"] = elapsed
316 return self._done(None)
318 def log_goal(self, goal, message=""):
320 for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
321 logmsg.append("sh%d to [%s]" % (shnum,
322 idlib.shortnodeid_b2a(peerid)))
323 self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
324 self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
327 def update_goal(self):
328 # if log.recording_noisy
330 self.log_goal(self.goal, "before update: ")
332 # first, remove any bad peers from our goal
333 self.goal = set([ (peerid, shnum)
334 for (peerid, shnum) in self.goal
335 if peerid not in self.bad_peers ])
337 # find the homeless shares:
338 homefull_shares = set([shnum for (peerid, shnum) in self.goal])
339 homeless_shares = set(range(self.total_shares)) - homefull_shares
340 homeless_shares = sorted(list(homeless_shares))
341 # place them somewhere. We prefer unused servers at the beginning of
342 # the available peer list.
344 if not homeless_shares:
347 # if an old share X is on a node, put the new share X there too.
348 # TODO: 1: redistribute shares to achieve one-per-peer, by copying
349 # shares from existing peers to new (less-crowded) ones. The
350 # old shares must still be updated.
351 # TODO: 2: move those shares instead of copying them, to reduce future
354 # this is a bit CPU intensive but easy to analyze. We create a sort
355 # order for each peerid. If the peerid is marked as bad, we don't
356 # even put them in the list. Then we care about the number of shares
357 # which have already been assigned to them. After that we care about
358 # their permutation order.
359 old_assignments = DictOfSets()
360 for (peerid, shnum) in self.goal:
361 old_assignments.add(peerid, shnum)
364 for i, (peerid, ss) in enumerate(self.full_peerlist):
365 if peerid in self.bad_peers:
367 entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
368 peerlist.append(entry)
372 raise NotEnoughServersError("Ran out of non-bad servers, "
374 str(self._first_write_error),
375 self._first_write_error)
377 # we then index this peerlist with an integer, because we may have to
378 # wrap. We update the goal as we go.
380 for shnum in homeless_shares:
381 (ignored1, ignored2, peerid, ss) = peerlist[i]
382 # if we are forced to send a share to a server that already has
383 # one, we may have two write requests in flight, and the
384 # servermap (which was computed before either request was sent)
385 # won't reflect the new shares, so the second response will be
386 # surprising. There is code in _got_write_answer() to tolerate
387 # this, otherwise it would cause the publish to fail with an
388 # UncoordinatedWriteError. See #546 for details of the trouble
389 # this used to cause.
390 self.goal.add( (peerid, shnum) )
391 self.connections[peerid] = ss
393 if i >= len(peerlist):
396 self.log_goal(self.goal, "after update: ")
400 def _encrypt_and_encode(self):
401 # this returns a Deferred that fires with a list of (sharedata,
402 # sharenum) tuples. TODO: cache the ciphertext, only produce the
403 # shares that we care about.
404 self.log("_encrypt_and_encode")
406 self._status.set_status("Encrypting")
407 started = time.time()
409 key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
411 crypttext = enc.process(self.newdata)
412 assert len(crypttext) == len(self.newdata)
415 self._status.timings["encrypt"] = now - started
420 self._status.set_status("Encoding")
421 fec = codec.CRSEncoder()
422 fec.set_params(self.segment_size,
423 self.required_shares, self.total_shares)
424 piece_size = fec.get_block_size()
425 crypttext_pieces = [None] * self.required_shares
426 for i in range(len(crypttext_pieces)):
427 offset = i * piece_size
428 piece = crypttext[offset:offset+piece_size]
429 piece = piece + "\x00"*(piece_size - len(piece)) # padding
430 crypttext_pieces[i] = piece
431 assert len(piece) == piece_size
433 d = fec.encode(crypttext_pieces)
434 def _done_encoding(res):
435 elapsed = time.time() - started
436 self._status.timings["encode"] = elapsed
438 d.addCallback(_done_encoding)
441 def _generate_shares(self, shares_and_shareids):
442 # this sets self.shares and self.root_hash
443 self.log("_generate_shares")
444 self._status.set_status("Generating Shares")
445 started = time.time()
447 # we should know these by now
448 privkey = self._privkey
449 encprivkey = self._encprivkey
450 pubkey = self._pubkey
452 (shares, share_ids) = shares_and_shareids
454 assert len(shares) == len(share_ids)
455 assert len(shares) == self.total_shares
457 block_hash_trees = {}
458 share_hash_leaves = [None] * len(shares)
459 for i in range(len(shares)):
460 share_data = shares[i]
462 all_shares[shnum] = share_data
464 # build the block hash tree. SDMF has only one leaf.
465 leaves = [hashutil.block_hash(share_data)]
466 t = hashtree.HashTree(leaves)
467 block_hash_trees[shnum] = list(t)
468 share_hash_leaves[shnum] = t[0]
469 for leaf in share_hash_leaves:
470 assert leaf is not None
471 share_hash_tree = hashtree.HashTree(share_hash_leaves)
472 share_hash_chain = {}
473 for shnum in range(self.total_shares):
474 needed_hashes = share_hash_tree.needed_hashes(shnum)
475 share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
476 for i in needed_hashes ] )
477 root_hash = share_hash_tree[0]
478 assert len(root_hash) == 32
479 self.log("my new root_hash is %s" % base32.b2a(root_hash))
480 self._new_version_info = (self._new_seqnum, root_hash, self.salt)
482 prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
483 self.required_shares, self.total_shares,
484 self.segment_size, len(self.newdata))
486 # now pack the beginning of the share. All shares are the same up
487 # to the signature, then they have divergent share hash chains,
488 # then completely different block hash trees + salt + share data,
489 # then they all share the same encprivkey at the end. The sizes
490 # of everything are the same for all shares.
492 sign_started = time.time()
493 signature = privkey.sign(prefix)
494 self._status.timings["sign"] = time.time() - sign_started
496 verification_key = pubkey.serialize()
499 for shnum in range(self.total_shares):
500 final_share = pack_share(prefix,
503 share_hash_chain[shnum],
504 block_hash_trees[shnum],
507 final_shares[shnum] = final_share
508 elapsed = time.time() - started
509 self._status.timings["pack"] = elapsed
510 self.shares = final_shares
511 self.root_hash = root_hash
513 # we also need to build up the version identifier for what we're
514 # pushing. Extract the offsets from one of our shares.
516 offsets = unpack_header(final_shares.values()[0])[-1]
517 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
518 verinfo = (self._new_seqnum, root_hash, self.salt,
519 self.segment_size, len(self.newdata),
520 self.required_shares, self.total_shares,
521 prefix, offsets_tuple)
522 self.versioninfo = verinfo
526 def _send_shares(self, needed):
527 self.log("_send_shares")
529 # we're finally ready to send out our shares. If we encounter any
530 # surprises here, it's because somebody else is writing at the same
531 # time. (Note: in the future, when we remove the _query_peers() step
532 # and instead speculate about [or remember] which shares are where,
533 # surprises here are *not* indications of UncoordinatedWriteError,
534 # and we'll need to respond to them more gracefully.)
536 # needed is a set of (peerid, shnum) tuples. The first thing we do is
537 # organize it by peerid.
539 peermap = DictOfSets()
540 for (peerid, shnum) in needed:
541 peermap.add(peerid, shnum)
543 # the next thing is to build up a bunch of test vectors. The
544 # semantics of Publish are that we perform the operation if the world
545 # hasn't changed since the ServerMap was constructed (more or less).
546 # For every share we're trying to place, we create a test vector that
547 # tests to see if the server*share still corresponds to the
550 all_tw_vectors = {} # maps peerid to tw_vectors
551 sm = self._servermap.servermap
554 (peerid, shnum) = key
557 # an old version of that share already exists on the
558 # server, according to our servermap. We will create a
559 # request that attempts to replace it.
560 old_versionid, old_timestamp = sm[key]
561 (old_seqnum, old_root_hash, old_salt, old_segsize,
562 old_datalength, old_k, old_N, old_prefix,
563 old_offsets_tuple) = old_versionid
564 old_checkstring = pack_checkstring(old_seqnum,
567 testv = (0, len(old_checkstring), "eq", old_checkstring)
569 elif key in self.bad_share_checkstrings:
570 old_checkstring = self.bad_share_checkstrings[key]
571 testv = (0, len(old_checkstring), "eq", old_checkstring)
574 # add a testv that requires the share not exist
576 # Unfortunately, foolscap-0.2.5 has a bug in the way inbound
577 # constraints are handled. If the same object is referenced
578 # multiple times inside the arguments, foolscap emits a
579 # 'reference' token instead of a distinct copy of the
580 # argument. The bug is that these 'reference' tokens are not
581 # accepted by the inbound constraint code. To work around
582 # this, we need to prevent python from interning the
583 # (constant) tuple, by creating a new copy of this vector
586 # This bug is fixed in foolscap-0.2.6, and even though this
587 # version of Tahoe requires foolscap-0.3.1 or newer, we are
588 # supposed to be able to interoperate with older versions of
589 # Tahoe which are allowed to use older versions of foolscap,
590 # including foolscap-0.2.5 . In addition, I've seen other
591 # foolscap problems triggered by 'reference' tokens (see #541
592 # for details). So we must keep this workaround in place.
594 #testv = (0, 1, 'eq', "")
595 testv = tuple([0, 1, 'eq', ""])
598 # the write vector is simply the share
599 writev = [(0, self.shares[shnum])]
601 if peerid not in all_tw_vectors:
602 all_tw_vectors[peerid] = {}
603 # maps shnum to (testvs, writevs, new_length)
604 assert shnum not in all_tw_vectors[peerid]
606 all_tw_vectors[peerid][shnum] = (testvs, writev, None)
608 # we read the checkstring back from each share, however we only use
609 # it to detect whether there was a new share that we didn't know
610 # about. The success or failure of the write will tell us whether
611 # there was a collision or not. If there is a collision, the first
612 # thing we'll do is update the servermap, which will find out what
613 # happened. We could conceivably reduce a roundtrip by using the
614 # readv checkstring to populate the servermap, but really we'd have
615 # to read enough data to validate the signatures too, so it wouldn't
617 read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
619 # ok, send the messages!
620 self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
621 started = time.time()
622 for (peerid, tw_vectors) in all_tw_vectors.items():
624 write_enabler = self._node.get_write_enabler(peerid)
625 renew_secret = self._node.get_renewal_secret(peerid)
626 cancel_secret = self._node.get_cancel_secret(peerid)
627 secrets = (write_enabler, renew_secret, cancel_secret)
628 shnums = tw_vectors.keys()
631 self.outstanding.add( (peerid, shnum) )
633 d = self._do_testreadwrite(peerid, secrets,
634 tw_vectors, read_vector)
635 d.addCallbacks(self._got_write_answer, self._got_write_error,
636 callbackArgs=(peerid, shnums, started),
637 errbackArgs=(peerid, shnums, started))
638 # tolerate immediate errback, like with DeadReferenceError
639 d.addBoth(fireEventually)
640 d.addCallback(self.loop)
641 d.addErrback(self._fatal_error)
643 self._update_status()
644 self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
646 def _do_testreadwrite(self, peerid, secrets,
647 tw_vectors, read_vector):
648 storage_index = self._storage_index
649 ss = self.connections[peerid]
651 #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
652 d = ss.callRemote("slot_testv_and_readv_and_writev",
659 def _got_write_answer(self, answer, peerid, shnums, started):
660 lp = self.log("_got_write_answer from %s" %
661 idlib.shortnodeid_b2a(peerid))
663 self.outstanding.discard( (peerid, shnum) )
666 elapsed = now - started
667 self._status.add_per_server_time(peerid, elapsed)
669 wrote, read_data = answer
671 surprise_shares = set(read_data.keys()) - set(shnums)
674 for shnum in surprise_shares:
675 # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
676 checkstring = read_data[shnum][0]
677 their_version_info = unpack_checkstring(checkstring)
678 if their_version_info == self._new_version_info:
679 # they have the right share, somehow
681 if (peerid,shnum) in self.goal:
682 # and we want them to have it, so we probably sent them a
683 # copy in an earlier write. This is ok, and avoids the
687 # They aren't in our goal, but they are still for the right
688 # version. Somebody else wrote them, and it's a convergent
689 # uncoordinated write. Pretend this is ok (don't be
690 # surprised), since I suspect there's a decent chance that
691 # we'll hit this in normal operation.
695 # the new shares are of a different version
696 if peerid in self._servermap.reachable_peers:
697 # we asked them about their shares, so we had knowledge
698 # of what they used to have. Any surprising shares must
699 # have come from someone else, so UCW.
702 # we didn't ask them, and now we've discovered that they
703 # have a share we didn't know about. This indicates that
704 # mapupdate should have wokred harder and asked more
705 # servers before concluding that it knew about them all.
707 # signal UCW, but make sure to ask this peer next time,
708 # so we'll remember to update it if/when we retry.
710 # TODO: ask this peer next time. I don't yet have a good
711 # way to do this. Two insufficient possibilities are:
713 # self._servermap.add_new_share(peerid, shnum, verinfo, now)
714 # but that requires fetching/validating/parsing the whole
715 # version string, and all we have is the checkstring
716 # self._servermap.mark_bad_share(peerid, shnum, checkstring)
717 # that will make publish overwrite the share next time,
718 # but it won't re-query the server, and it won't make
719 # mapupdate search further
721 # TODO later: when publish starts, do
722 # servermap.get_best_version(), extract the seqnum,
723 # subtract one, and store as highest-replaceable-seqnum.
724 # Then, if this surprise-because-we-didn't-ask share is
725 # of highest-replaceable-seqnum or lower, we're allowed
726 # to replace it: send out a new writev (or rather add it
727 # to self.goal and loop).
733 self.log("they had shares %s that we didn't know about" %
734 (list(surprise_shares),),
735 parent=lp, level=log.WEIRD, umid="un9CSQ")
736 self.surprised = True
739 # TODO: there are two possibilities. The first is that the server
740 # is full (or just doesn't want to give us any room), which means
741 # we shouldn't ask them again, but is *not* an indication of an
742 # uncoordinated write. The second is that our testv failed, which
743 # *does* indicate an uncoordinated write. We currently don't have
744 # a way to tell these two apart (in fact, the storage server code
745 # doesn't have the option of refusing our share).
747 # If the server is full, mark the peer as bad (so we don't ask
748 # them again), but don't set self.surprised. The loop() will find
751 # If the testv failed, log it, set self.surprised, but don't
752 # bother adding to self.bad_peers .
754 self.log("our testv failed, so the write did not happen",
755 parent=lp, level=log.WEIRD, umid="8sc26g")
756 self.surprised = True
757 self.bad_peers.add(peerid) # don't ask them again
758 # use the checkstring to add information to the log message
759 for (shnum,readv) in read_data.items():
760 checkstring = readv[0]
763 other_salt) = unpack_checkstring(checkstring)
764 expected_version = self._servermap.version_on_peer(peerid,
767 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
768 offsets_tuple) = expected_version
769 self.log("somebody modified the share on us:"
770 " shnum=%d: I thought they had #%d:R=%s,"
771 " but testv reported #%d:R=%s" %
773 seqnum, base32.b2a(root_hash)[:4],
774 other_seqnum, base32.b2a(other_roothash)[:4]),
775 parent=lp, level=log.NOISY)
776 # if expected_version==None, then we didn't expect to see a
777 # share on that peer, and the 'surprise_shares' clause above
778 # will have logged it.
779 # self.loop() will take care of finding new homes
783 self.placed.add( (peerid, shnum) )
784 # and update the servermap
785 self._servermap.add_new_share(peerid, shnum,
786 self.versioninfo, started)
788 # self.loop() will take care of checking to see if we're done
791 def _got_write_error(self, f, peerid, shnums, started):
793 self.outstanding.discard( (peerid, shnum) )
794 self.bad_peers.add(peerid)
795 if self._first_write_error is None:
796 self._first_write_error = f
797 self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
798 shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
801 # self.loop() will take care of checking to see if we're done
805 def _done(self, res):
806 if not self._running:
808 self._running = False
810 self._status.timings["total"] = now - self._started
811 self._status.set_active(False)
812 if isinstance(res, failure.Failure):
813 self.log("Publish done, with failure", failure=res,
814 level=log.WEIRD, umid="nRsR9Q")
815 self._status.set_status("Failed")
817 self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
818 self._status.set_status("UncoordinatedWriteError")
820 res = failure.Failure(UncoordinatedWriteError())
823 self.log("Publish done, success")
824 self._status.set_status("Finished")
825 self._status.set_progress(1.0)
826 eventually(self.done_deferred.callback, res)