]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
mutable: test write failures, uncoordinated write detection
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, struct, time
4 from itertools import count
5 from zope.interface import implements
6 from twisted.internet import defer
7 from twisted.python import failure
8 from allmydata.interfaces import IPublishStatus
9 from allmydata.util import base32, hashutil, mathutil, idlib, log
10 from allmydata import hashtree, codec, storage
11 from pycryptopp.cipher.aes import AES
12 from foolscap.eventual import eventually
13
14 from common import MODE_WRITE, DictOfSets, \
15      UncoordinatedWriteError, NotEnoughServersError
16 from servermap import ServerMap
17 from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
18      unpack_checkstring, SIGNED_PREFIX
19
20 class PublishStatus:
21     implements(IPublishStatus)
22     statusid_counter = count(0)
23     def __init__(self):
24         self.timings = {}
25         self.timings["send_per_server"] = {}
26         self.servermap = None
27         self.problems = {}
28         self.active = True
29         self.storage_index = None
30         self.helper = False
31         self.encoding = ("?", "?")
32         self.size = None
33         self.status = "Not started"
34         self.progress = 0.0
35         self.counter = self.statusid_counter.next()
36         self.started = time.time()
37
38     def add_per_server_time(self, peerid, elapsed):
39         if peerid not in self.timings["send_per_server"]:
40             self.timings["send_per_server"][peerid] = []
41         self.timings["send_per_server"][peerid].append(elapsed)
42
43     def get_started(self):
44         return self.started
45     def get_storage_index(self):
46         return self.storage_index
47     def get_encoding(self):
48         return self.encoding
49     def using_helper(self):
50         return self.helper
51     def get_servermap(self):
52         return self.servermap
53     def get_size(self):
54         return self.size
55     def get_status(self):
56         return self.status
57     def get_progress(self):
58         return self.progress
59     def get_active(self):
60         return self.active
61     def get_counter(self):
62         return self.counter
63
64     def set_storage_index(self, si):
65         self.storage_index = si
66     def set_helper(self, helper):
67         self.helper = helper
68     def set_servermap(self, servermap):
69         self.servermap = servermap
70     def set_encoding(self, k, n):
71         self.encoding = (k, n)
72     def set_size(self, size):
73         self.size = size
74     def set_status(self, status):
75         self.status = status
76     def set_progress(self, value):
77         self.progress = value
78     def set_active(self, value):
79         self.active = value
80
81 class Publish:
82     """I represent a single act of publishing the mutable file to the grid. I
83     will only publish my data if the servermap I am using still represents
84     the current state of the world.
85
86     To make the initial publish, set servermap to None.
87     """
88
89     # we limit the segment size as usual to constrain our memory footprint.
90     # The max segsize is higher for mutable files, because we want to support
91     # dirnodes with up to 10k children, and each child uses about 330 bytes.
92     # If you actually put that much into a directory you'll be using a
93     # footprint of around 14MB, which is higher than we'd like, but it is
94     # more important right now to support large directories than to make
95     # memory usage small when you use them. Once we implement MDMF (with
96     # multiple segments), we will drop this back down, probably to 128KiB.
97     MAX_SEGMENT_SIZE = 3500000
98
99     def __init__(self, filenode, servermap):
100         self._node = filenode
101         self._servermap = servermap
102         self._storage_index = self._node.get_storage_index()
103         self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
104         num = self._node._client.log("Publish(%s): starting" % prefix)
105         self._log_number = num
106         self._running = True
107
108         self._status = PublishStatus()
109         self._status.set_storage_index(self._storage_index)
110         self._status.set_helper(False)
111         self._status.set_progress(0.0)
112         self._status.set_active(True)
113
114     def get_status(self):
115         return self._status
116
117     def log(self, *args, **kwargs):
118         if 'parent' not in kwargs:
119             kwargs['parent'] = self._log_number
120         return log.msg(*args, **kwargs)
121
122     def publish(self, newdata):
123         """Publish the filenode's current contents.  Returns a Deferred that
124         fires (with None) when the publish has done as much work as it's ever
125         going to do, or errbacks with ConsistencyError if it detects a
126         simultaneous write.
127         """
128
129         # 1: generate shares (SDMF: files are small, so we can do it in RAM)
130         # 2: perform peer selection, get candidate servers
131         #  2a: send queries to n+epsilon servers, to determine current shares
132         #  2b: based upon responses, create target map
133         # 3: send slot_testv_and_readv_and_writev messages
134         # 4: as responses return, update share-dispatch table
135         # 4a: may need to run recovery algorithm
136         # 5: when enough responses are back, we're done
137
138         self.log("starting publish, datalen is %s" % len(newdata))
139         self._status.set_size(len(newdata))
140         self._status.set_status("Started")
141         self._started = time.time()
142
143         self.done_deferred = defer.Deferred()
144
145         self._writekey = self._node.get_writekey()
146         assert self._writekey, "need write capability to publish"
147
148         # first, which servers will we publish to? We require that the
149         # servermap was updated in MODE_WRITE, so we can depend upon the
150         # peerlist computed by that process instead of computing our own.
151         if self._servermap:
152             assert self._servermap.last_update_mode == MODE_WRITE
153             # we will push a version that is one larger than anything present
154             # in the grid, according to the servermap.
155             self._new_seqnum = self._servermap.highest_seqnum() + 1
156         else:
157             # If we don't have a servermap, that's because we're doing the
158             # initial publish
159             self._new_seqnum = 1
160             self._servermap = ServerMap()
161         self._status.set_servermap(self._servermap)
162
163         self.log(format="new seqnum will be %(seqnum)d",
164                  seqnum=self._new_seqnum, level=log.NOISY)
165
166         # having an up-to-date servermap (or using a filenode that was just
167         # created for the first time) also guarantees that the following
168         # fields are available
169         self.readkey = self._node.get_readkey()
170         self.required_shares = self._node.get_required_shares()
171         assert self.required_shares is not None
172         self.total_shares = self._node.get_total_shares()
173         assert self.total_shares is not None
174         self._status.set_encoding(self.required_shares, self.total_shares)
175
176         self._pubkey = self._node.get_pubkey()
177         assert self._pubkey
178         self._privkey = self._node.get_privkey()
179         assert self._privkey
180         self._encprivkey = self._node.get_encprivkey()
181
182         client = self._node._client
183         full_peerlist = client.get_permuted_peers("storage",
184                                                   self._storage_index)
185         self.full_peerlist = full_peerlist # for use later, immutable
186         self.bad_peers = set() # peerids who have errbacked/refused requests
187
188         self.newdata = newdata
189         self.salt = os.urandom(16)
190
191         self.setup_encoding_parameters()
192
193         # if we experience any surprises (writes which were rejected because
194         # our test vector did not match, or shares which we didn't expect to
195         # see), we set this flag and report an UncoordinatedWriteError at the
196         # end of the publish process.
197         self.surprised = False
198
199         # as a failsafe, refuse to iterate through self.loop more than a
200         # thousand times.
201         self.looplimit = 1000
202
203         # we keep track of three tables. The first is our goal: which share
204         # we want to see on which servers. This is initially populated by the
205         # existing servermap.
206         self.goal = set() # pairs of (peerid, shnum) tuples
207
208         # the second table is our list of outstanding queries: those which
209         # are in flight and may or may not be delivered, accepted, or
210         # acknowledged. Items are added to this table when the request is
211         # sent, and removed when the response returns (or errbacks).
212         self.outstanding = set() # (peerid, shnum) tuples
213
214         # the third is a table of successes: share which have actually been
215         # placed. These are populated when responses come back with success.
216         # When self.placed == self.goal, we're done.
217         self.placed = set() # (peerid, shnum) tuples
218
219         # we also keep a mapping from peerid to RemoteReference. Each time we
220         # pull a connection out of the full peerlist, we add it to this for
221         # use later.
222         self.connections = {}
223
224         # we use the servermap to populate the initial goal: this way we will
225         # try to update each existing share in place.
226         for (peerid, shnum) in self._servermap.servermap:
227             self.goal.add( (peerid, shnum) )
228             self.connections[peerid] = self._servermap.connections[peerid]
229
230         # create the shares. We'll discard these as they are delivered. SMDF:
231         # we're allowed to hold everything in memory.
232
233         self._status.timings["setup"] = time.time() - self._started
234         d = self._encrypt_and_encode()
235         d.addCallback(self._generate_shares)
236         def _start_pushing(res):
237             self._started_pushing = time.time()
238             return res
239         d.addCallback(_start_pushing)
240         d.addCallback(self.loop) # trigger delivery
241         d.addErrback(self._fatal_error)
242
243         return self.done_deferred
244
245     def setup_encoding_parameters(self):
246         segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
247         # this must be a multiple of self.required_shares
248         segment_size = mathutil.next_multiple(segment_size,
249                                               self.required_shares)
250         self.segment_size = segment_size
251         if segment_size:
252             self.num_segments = mathutil.div_ceil(len(self.newdata),
253                                                   segment_size)
254         else:
255             self.num_segments = 0
256         assert self.num_segments in [0, 1,] # SDMF restrictions
257
258     def _fatal_error(self, f):
259         self.log("error during loop", failure=f, level=log.SCARY)
260         self._done(f)
261
262     def _update_status(self):
263         self._status.set_status("Sending Shares: %d placed out of %d, "
264                                 "%d messages outstanding" %
265                                 (len(self.placed),
266                                  len(self.goal),
267                                  len(self.outstanding)))
268         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
269
270     def loop(self, ignored=None):
271         self.log("entering loop", level=log.NOISY)
272         if not self._running:
273             return
274
275         self.looplimit -= 1
276         if self.looplimit <= 0:
277             raise RuntimeError("loop limit exceeded")
278
279         if self.surprised:
280             # don't send out any new shares, just wait for the outstanding
281             # ones to be retired.
282             self.log("currently surprised, so don't send any new shares",
283                      level=log.NOISY)
284         else:
285             self.update_goal()
286             # how far are we from our goal?
287             needed = self.goal - self.placed - self.outstanding
288             self._update_status()
289
290             if needed:
291                 # we need to send out new shares
292                 self.log(format="need to send %(needed)d new shares",
293                          needed=len(needed), level=log.NOISY)
294                 self._send_shares(needed)
295                 return
296
297         if self.outstanding:
298             # queries are still pending, keep waiting
299             self.log(format="%(outstanding)d queries still outstanding",
300                      outstanding=len(self.outstanding),
301                      level=log.NOISY)
302             return
303
304         # no queries outstanding, no placements needed: we're done
305         self.log("no queries outstanding, no placements needed: done",
306                  level=log.OPERATIONAL)
307         now = time.time()
308         elapsed = now - self._started_pushing
309         self._status.timings["push"] = elapsed
310         return self._done(None)
311
312     def log_goal(self, goal, message=""):
313         logmsg = [message]
314         for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
315             logmsg.append("sh%d to [%s]" % (shnum,
316                                             idlib.shortnodeid_b2a(peerid)))
317         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
318         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
319                  level=log.NOISY)
320
321     def update_goal(self):
322         # if log.recording_noisy
323         if True:
324             self.log_goal(self.goal, "before update: ")
325
326         # first, remove any bad peers from our goal
327         self.goal = set([ (peerid, shnum)
328                           for (peerid, shnum) in self.goal
329                           if peerid not in self.bad_peers ])
330
331         # find the homeless shares:
332         homefull_shares = set([shnum for (peerid, shnum) in self.goal])
333         homeless_shares = set(range(self.total_shares)) - homefull_shares
334         homeless_shares = sorted(list(homeless_shares))
335         # place them somewhere. We prefer unused servers at the beginning of
336         # the available peer list.
337
338         if not homeless_shares:
339             return
340
341         # if an old share X is on a node, put the new share X there too.
342         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
343         #       shares from existing peers to new (less-crowded) ones. The
344         #       old shares must still be updated.
345         # TODO: 2: move those shares instead of copying them, to reduce future
346         #       update work
347
348         # this is a bit CPU intensive but easy to analyze. We create a sort
349         # order for each peerid. If the peerid is marked as bad, we don't
350         # even put them in the list. Then we care about the number of shares
351         # which have already been assigned to them. After that we care about
352         # their permutation order.
353         old_assignments = DictOfSets()
354         for (peerid, shnum) in self.goal:
355             old_assignments.add(peerid, shnum)
356
357         peerlist = []
358         for i, (peerid, ss) in enumerate(self.full_peerlist):
359             if peerid in self.bad_peers:
360                 continue
361             entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
362             peerlist.append(entry)
363         peerlist.sort()
364
365         if not peerlist:
366             raise NotEnoughServersError("Ran out of non-bad servers")
367
368         new_assignments = []
369         # we then index this peerlist with an integer, because we may have to
370         # wrap. We update the goal as we go.
371         i = 0
372         for shnum in homeless_shares:
373             (ignored1, ignored2, peerid, ss) = peerlist[i]
374             # TODO: if we are forced to send a share to a server that already
375             # has one, we may have two write requests in flight, and the
376             # servermap (which was computed before either request was sent)
377             # won't reflect the new shares, so the second response will cause
378             # us to be surprised ("unexpected share on peer"), causing the
379             # publish to fail with an UncoordinatedWriteError. This is
380             # troublesome but not really a bit problem. Fix it at some point.
381             self.goal.add( (peerid, shnum) )
382             self.connections[peerid] = ss
383             i += 1
384             if i >= len(peerlist):
385                 i = 0
386         if True:
387             self.log_goal(self.goal, "after update: ")
388
389
390
391     def _encrypt_and_encode(self):
392         # this returns a Deferred that fires with a list of (sharedata,
393         # sharenum) tuples. TODO: cache the ciphertext, only produce the
394         # shares that we care about.
395         self.log("_encrypt_and_encode")
396
397         self._status.set_status("Encrypting")
398         started = time.time()
399
400         key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
401         enc = AES(key)
402         crypttext = enc.process(self.newdata)
403         assert len(crypttext) == len(self.newdata)
404
405         now = time.time()
406         self._status.timings["encrypt"] = now - started
407         started = now
408
409         # now apply FEC
410
411         self._status.set_status("Encoding")
412         fec = codec.CRSEncoder()
413         fec.set_params(self.segment_size,
414                        self.required_shares, self.total_shares)
415         piece_size = fec.get_block_size()
416         crypttext_pieces = [None] * self.required_shares
417         for i in range(len(crypttext_pieces)):
418             offset = i * piece_size
419             piece = crypttext[offset:offset+piece_size]
420             piece = piece + "\x00"*(piece_size - len(piece)) # padding
421             crypttext_pieces[i] = piece
422             assert len(piece) == piece_size
423
424         d = fec.encode(crypttext_pieces)
425         def _done_encoding(res):
426             elapsed = time.time() - started
427             self._status.timings["encode"] = elapsed
428             return res
429         d.addCallback(_done_encoding)
430         return d
431
432     def _generate_shares(self, shares_and_shareids):
433         # this sets self.shares and self.root_hash
434         self.log("_generate_shares")
435         self._status.set_status("Generating Shares")
436         started = time.time()
437
438         # we should know these by now
439         privkey = self._privkey
440         encprivkey = self._encprivkey
441         pubkey = self._pubkey
442
443         (shares, share_ids) = shares_and_shareids
444
445         assert len(shares) == len(share_ids)
446         assert len(shares) == self.total_shares
447         all_shares = {}
448         block_hash_trees = {}
449         share_hash_leaves = [None] * len(shares)
450         for i in range(len(shares)):
451             share_data = shares[i]
452             shnum = share_ids[i]
453             all_shares[shnum] = share_data
454
455             # build the block hash tree. SDMF has only one leaf.
456             leaves = [hashutil.block_hash(share_data)]
457             t = hashtree.HashTree(leaves)
458             block_hash_trees[shnum] = block_hash_tree = list(t)
459             share_hash_leaves[shnum] = t[0]
460         for leaf in share_hash_leaves:
461             assert leaf is not None
462         share_hash_tree = hashtree.HashTree(share_hash_leaves)
463         share_hash_chain = {}
464         for shnum in range(self.total_shares):
465             needed_hashes = share_hash_tree.needed_hashes(shnum)
466             share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
467                                               for i in needed_hashes ] )
468         root_hash = share_hash_tree[0]
469         assert len(root_hash) == 32
470         self.log("my new root_hash is %s" % base32.b2a(root_hash))
471
472         prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
473                              self.required_shares, self.total_shares,
474                              self.segment_size, len(self.newdata))
475
476         # now pack the beginning of the share. All shares are the same up
477         # to the signature, then they have divergent share hash chains,
478         # then completely different block hash trees + salt + share data,
479         # then they all share the same encprivkey at the end. The sizes
480         # of everything are the same for all shares.
481
482         sign_started = time.time()
483         signature = privkey.sign(prefix)
484         self._status.timings["sign"] = time.time() - sign_started
485
486         verification_key = pubkey.serialize()
487
488         final_shares = {}
489         for shnum in range(self.total_shares):
490             final_share = pack_share(prefix,
491                                      verification_key,
492                                      signature,
493                                      share_hash_chain[shnum],
494                                      block_hash_trees[shnum],
495                                      all_shares[shnum],
496                                      encprivkey)
497             final_shares[shnum] = final_share
498         elapsed = time.time() - started
499         self._status.timings["pack"] = elapsed
500         self.shares = final_shares
501         self.root_hash = root_hash
502
503         # we also need to build up the version identifier for what we're
504         # pushing. Extract the offsets from one of our shares.
505         assert final_shares
506         offsets = unpack_header(final_shares.values()[0])[-1]
507         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
508         verinfo = (self._new_seqnum, root_hash, self.salt,
509                    self.segment_size, len(self.newdata),
510                    self.required_shares, self.total_shares,
511                    prefix, offsets_tuple)
512         self.versioninfo = verinfo
513
514
515
516     def _send_shares(self, needed):
517         self.log("_send_shares")
518
519         # we're finally ready to send out our shares. If we encounter any
520         # surprises here, it's because somebody else is writing at the same
521         # time. (Note: in the future, when we remove the _query_peers() step
522         # and instead speculate about [or remember] which shares are where,
523         # surprises here are *not* indications of UncoordinatedWriteError,
524         # and we'll need to respond to them more gracefully.)
525
526         # needed is a set of (peerid, shnum) tuples. The first thing we do is
527         # organize it by peerid.
528
529         peermap = DictOfSets()
530         for (peerid, shnum) in needed:
531             peermap.add(peerid, shnum)
532
533         # the next thing is to build up a bunch of test vectors. The
534         # semantics of Publish are that we perform the operation if the world
535         # hasn't changed since the ServerMap was constructed (more or less).
536         # For every share we're trying to place, we create a test vector that
537         # tests to see if the server*share still corresponds to the
538         # map.
539
540         all_tw_vectors = {} # maps peerid to tw_vectors
541         sm = self._servermap.servermap
542
543         for key in needed:
544             (peerid, shnum) = key
545
546             if key in sm:
547                 # an old version of that share already exists on the
548                 # server, according to our servermap. We will create a
549                 # request that attempts to replace it.
550                 old_versionid, old_timestamp = sm[key]
551                 (old_seqnum, old_root_hash, old_salt, old_segsize,
552                  old_datalength, old_k, old_N, old_prefix,
553                  old_offsets_tuple) = old_versionid
554                 old_checkstring = pack_checkstring(old_seqnum,
555                                                    old_root_hash,
556                                                    old_salt)
557                 testv = (0, len(old_checkstring), "eq", old_checkstring)
558
559             else:
560                 # add a testv that requires the share not exist
561                 #testv = (0, 1, 'eq', "")
562
563                 # Unfortunately, foolscap-0.2.5 has a bug in the way inbound
564                 # constraints are handled. If the same object is referenced
565                 # multiple times inside the arguments, foolscap emits a
566                 # 'reference' token instead of a distinct copy of the
567                 # argument. The bug is that these 'reference' tokens are not
568                 # accepted by the inbound constraint code. To work around
569                 # this, we need to prevent python from interning the
570                 # (constant) tuple, by creating a new copy of this vector
571                 # each time. This bug is fixed in later versions of foolscap.
572                 testv = tuple([0, 1, 'eq', ""])
573
574             testvs = [testv]
575             # the write vector is simply the share
576             writev = [(0, self.shares[shnum])]
577
578             if peerid not in all_tw_vectors:
579                 all_tw_vectors[peerid] = {}
580                 # maps shnum to (testvs, writevs, new_length)
581             assert shnum not in all_tw_vectors[peerid]
582
583             all_tw_vectors[peerid][shnum] = (testvs, writev, None)
584
585         # we read the checkstring back from each share, however we only use
586         # it to detect whether there was a new share that we didn't know
587         # about. The success or failure of the write will tell us whether
588         # there was a collision or not. If there is a collision, the first
589         # thing we'll do is update the servermap, which will find out what
590         # happened. We could conceivably reduce a roundtrip by using the
591         # readv checkstring to populate the servermap, but really we'd have
592         # to read enough data to validate the signatures too, so it wouldn't
593         # be an overall win.
594         read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
595
596         # ok, send the messages!
597         self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
598         started = time.time()
599         for (peerid, tw_vectors) in all_tw_vectors.items():
600
601             write_enabler = self._node.get_write_enabler(peerid)
602             renew_secret = self._node.get_renewal_secret(peerid)
603             cancel_secret = self._node.get_cancel_secret(peerid)
604             secrets = (write_enabler, renew_secret, cancel_secret)
605             shnums = tw_vectors.keys()
606
607             for shnum in shnums:
608                 self.outstanding.add( (peerid, shnum) )
609
610             d = self._do_testreadwrite(peerid, secrets,
611                                        tw_vectors, read_vector)
612             d.addCallbacks(self._got_write_answer, self._got_write_error,
613                            callbackArgs=(peerid, shnums, started),
614                            errbackArgs=(peerid, shnums, started))
615             d.addCallback(self.loop)
616             d.addErrback(self._fatal_error)
617
618         self._update_status()
619         self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
620
621     def _do_testreadwrite(self, peerid, secrets,
622                           tw_vectors, read_vector):
623         storage_index = self._storage_index
624         ss = self.connections[peerid]
625
626         #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
627         d = ss.callRemote("slot_testv_and_readv_and_writev",
628                           storage_index,
629                           secrets,
630                           tw_vectors,
631                           read_vector)
632         return d
633
634     def _got_write_answer(self, answer, peerid, shnums, started):
635         lp = self.log("_got_write_answer from %s" %
636                       idlib.shortnodeid_b2a(peerid))
637         for shnum in shnums:
638             self.outstanding.discard( (peerid, shnum) )
639
640         now = time.time()
641         elapsed = now - started
642         self._status.add_per_server_time(peerid, elapsed)
643
644         wrote, read_data = answer
645
646         surprise_shares = set(read_data.keys()) - set(shnums)
647         if surprise_shares:
648             self.log("they had shares %s that we didn't know about" %
649                      (list(surprise_shares),),
650                      parent=lp, level=log.WEIRD)
651             self.surprised = True
652
653         if not wrote:
654             # TODO: there are two possibilities. The first is that the server
655             # is full (or just doesn't want to give us any room), which means
656             # we shouldn't ask them again, but is *not* an indication of an
657             # uncoordinated write. The second is that our testv failed, which
658             # *does* indicate an uncoordinated write. We currently don't have
659             # a way to tell these two apart (in fact, the storage server code
660             # doesn't have the option of refusing our share).
661             #
662             # If the server is full, mark the peer as bad (so we don't ask
663             # them again), but don't set self.surprised. The loop() will find
664             # a new server.
665             #
666             # If the testv failed, log it, set self.surprised, but don't
667             # bother adding to self.bad_peers .
668
669             self.log("our testv failed, so the write did not happen",
670                      parent=lp, level=log.WEIRD)
671             self.surprised = True
672             self.bad_peers.add(peerid) # don't ask them again
673             # use the checkstring to add information to the log message
674             for (shnum,readv) in read_data.items():
675                 checkstring = readv[0]
676                 (other_seqnum,
677                  other_roothash,
678                  other_salt) = unpack_checkstring(checkstring)
679                 expected_version = self._servermap.version_on_peer(peerid,
680                                                                    shnum)
681                 if expected_version:
682                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
683                      offsets_tuple) = expected_version
684                     self.log("somebody modified the share on us:"
685                              " shnum=%d: I thought they had #%d:R=%s,"
686                              " but testv reported #%d:R=%s" %
687                              (shnum,
688                               seqnum, base32.b2a(root_hash)[:4],
689                               other_seqnum, base32.b2a(other_roothash)[:4]),
690                              parent=lp, level=log.NOISY)
691                 # if expected_version==None, then we didn't expect to see a
692                 # share on that peer, and the 'surprise_shares' clause above
693                 # will have logged it.
694             # self.loop() will take care of finding new homes
695             return
696
697         for shnum in shnums:
698             self.placed.add( (peerid, shnum) )
699             # and update the servermap
700             self._servermap.add_new_share(peerid, shnum,
701                                           self.versioninfo, started)
702
703         # self.loop() will take care of checking to see if we're done
704         return
705
706     def _got_write_error(self, f, peerid, shnums, started):
707         for shnum in shnums:
708             self.outstanding.discard( (peerid, shnum) )
709         self.bad_peers.add(peerid)
710         self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
711                  shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
712                  failure=f,
713                  level=log.UNUSUAL)
714         # self.loop() will take care of checking to see if we're done
715         return
716
717
718     def _done(self, res):
719         if not self._running:
720             return
721         self._running = False
722         now = time.time()
723         self._status.timings["total"] = now - self._started
724         self._status.set_active(False)
725         if isinstance(res, failure.Failure):
726             self.log("Publish done, with failure", failure=res, level=log.WEIRD)
727             self._status.set_status("Failed")
728         elif self.surprised:
729             self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
730             self._status.set_status("UncoordinatedWriteError")
731             # deliver a failure
732             res = failure.Failure(UncoordinatedWriteError())
733             # TODO: recovery
734         else:
735             self.log("Publish done, success")
736             self._status.set_status("Done")
737             self._status.set_progress(1.0)
738         eventually(self.done_deferred.callback, res)
739
740