]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
Refactor StorageFarmBroker handling of servers
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, struct, time
4 from itertools import count
5 from zope.interface import implements
6 from twisted.internet import defer
7 from twisted.python import failure
8 from allmydata.interfaces import IPublishStatus
9 from allmydata.util import base32, hashutil, mathutil, idlib, log
10 from allmydata.util.dictutil import DictOfSets
11 from allmydata import hashtree, codec
12 from allmydata.storage.server import si_b2a
13 from pycryptopp.cipher.aes import AES
14 from foolscap.api import eventually, fireEventually
15
16 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
17      UncoordinatedWriteError, NotEnoughServersError
18 from allmydata.mutable.servermap import ServerMap
19 from allmydata.mutable.layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
20      unpack_checkstring, SIGNED_PREFIX
21
22 class PublishStatus:
23     implements(IPublishStatus)
24     statusid_counter = count(0)
25     def __init__(self):
26         self.timings = {}
27         self.timings["send_per_server"] = {}
28         self.servermap = None
29         self.problems = {}
30         self.active = True
31         self.storage_index = None
32         self.helper = False
33         self.encoding = ("?", "?")
34         self.size = None
35         self.status = "Not started"
36         self.progress = 0.0
37         self.counter = self.statusid_counter.next()
38         self.started = time.time()
39
40     def add_per_server_time(self, peerid, elapsed):
41         if peerid not in self.timings["send_per_server"]:
42             self.timings["send_per_server"][peerid] = []
43         self.timings["send_per_server"][peerid].append(elapsed)
44
45     def get_started(self):
46         return self.started
47     def get_storage_index(self):
48         return self.storage_index
49     def get_encoding(self):
50         return self.encoding
51     def using_helper(self):
52         return self.helper
53     def get_servermap(self):
54         return self.servermap
55     def get_size(self):
56         return self.size
57     def get_status(self):
58         return self.status
59     def get_progress(self):
60         return self.progress
61     def get_active(self):
62         return self.active
63     def get_counter(self):
64         return self.counter
65
66     def set_storage_index(self, si):
67         self.storage_index = si
68     def set_helper(self, helper):
69         self.helper = helper
70     def set_servermap(self, servermap):
71         self.servermap = servermap
72     def set_encoding(self, k, n):
73         self.encoding = (k, n)
74     def set_size(self, size):
75         self.size = size
76     def set_status(self, status):
77         self.status = status
78     def set_progress(self, value):
79         self.progress = value
80     def set_active(self, value):
81         self.active = value
82
83 class LoopLimitExceededError(Exception):
84     pass
85
86 class Publish:
87     """I represent a single act of publishing the mutable file to the grid. I
88     will only publish my data if the servermap I am using still represents
89     the current state of the world.
90
91     To make the initial publish, set servermap to None.
92     """
93
94     def __init__(self, filenode, storage_broker, servermap):
95         self._node = filenode
96         self._storage_broker = storage_broker
97         self._servermap = servermap
98         self._storage_index = self._node.get_storage_index()
99         self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
100         num = self.log("Publish(%s): starting" % prefix, parent=None)
101         self._log_number = num
102         self._running = True
103         self._first_write_error = None
104
105         self._status = PublishStatus()
106         self._status.set_storage_index(self._storage_index)
107         self._status.set_helper(False)
108         self._status.set_progress(0.0)
109         self._status.set_active(True)
110
111     def get_status(self):
112         return self._status
113
114     def log(self, *args, **kwargs):
115         if 'parent' not in kwargs:
116             kwargs['parent'] = self._log_number
117         if "facility" not in kwargs:
118             kwargs["facility"] = "tahoe.mutable.publish"
119         return log.msg(*args, **kwargs)
120
121     def publish(self, newdata):
122         """Publish the filenode's current contents.  Returns a Deferred that
123         fires (with None) when the publish has done as much work as it's ever
124         going to do, or errbacks with ConsistencyError if it detects a
125         simultaneous write.
126         """
127
128         # 1: generate shares (SDMF: files are small, so we can do it in RAM)
129         # 2: perform peer selection, get candidate servers
130         #  2a: send queries to n+epsilon servers, to determine current shares
131         #  2b: based upon responses, create target map
132         # 3: send slot_testv_and_readv_and_writev messages
133         # 4: as responses return, update share-dispatch table
134         # 4a: may need to run recovery algorithm
135         # 5: when enough responses are back, we're done
136
137         self.log("starting publish, datalen is %s" % len(newdata))
138         self._status.set_size(len(newdata))
139         self._status.set_status("Started")
140         self._started = time.time()
141
142         self.done_deferred = defer.Deferred()
143
144         self._writekey = self._node.get_writekey()
145         assert self._writekey, "need write capability to publish"
146
147         # first, which servers will we publish to? We require that the
148         # servermap was updated in MODE_WRITE, so we can depend upon the
149         # peerlist computed by that process instead of computing our own.
150         if self._servermap:
151             assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
152             # we will push a version that is one larger than anything present
153             # in the grid, according to the servermap.
154             self._new_seqnum = self._servermap.highest_seqnum() + 1
155         else:
156             # If we don't have a servermap, that's because we're doing the
157             # initial publish
158             self._new_seqnum = 1
159             self._servermap = ServerMap()
160         self._status.set_servermap(self._servermap)
161
162         self.log(format="new seqnum will be %(seqnum)d",
163                  seqnum=self._new_seqnum, level=log.NOISY)
164
165         # having an up-to-date servermap (or using a filenode that was just
166         # created for the first time) also guarantees that the following
167         # fields are available
168         self.readkey = self._node.get_readkey()
169         self.required_shares = self._node.get_required_shares()
170         assert self.required_shares is not None
171         self.total_shares = self._node.get_total_shares()
172         assert self.total_shares is not None
173         self._status.set_encoding(self.required_shares, self.total_shares)
174
175         self._pubkey = self._node.get_pubkey()
176         assert self._pubkey
177         self._privkey = self._node.get_privkey()
178         assert self._privkey
179         self._encprivkey = self._node.get_encprivkey()
180
181         sb = self._storage_broker
182         full_peerlist = [(s.get_serverid(), s.get_rref())
183                          for s in sb.get_servers_for_psi(self._storage_index)]
184         self.full_peerlist = full_peerlist # for use later, immutable
185         self.bad_peers = set() # peerids who have errbacked/refused requests
186
187         self.newdata = newdata
188         self.salt = os.urandom(16)
189
190         self.setup_encoding_parameters()
191
192         # if we experience any surprises (writes which were rejected because
193         # our test vector did not match, or shares which we didn't expect to
194         # see), we set this flag and report an UncoordinatedWriteError at the
195         # end of the publish process.
196         self.surprised = False
197
198         # as a failsafe, refuse to iterate through self.loop more than a
199         # thousand times.
200         self.looplimit = 1000
201
202         # we keep track of three tables. The first is our goal: which share
203         # we want to see on which servers. This is initially populated by the
204         # existing servermap.
205         self.goal = set() # pairs of (peerid, shnum) tuples
206
207         # the second table is our list of outstanding queries: those which
208         # are in flight and may or may not be delivered, accepted, or
209         # acknowledged. Items are added to this table when the request is
210         # sent, and removed when the response returns (or errbacks).
211         self.outstanding = set() # (peerid, shnum) tuples
212
213         # the third is a table of successes: share which have actually been
214         # placed. These are populated when responses come back with success.
215         # When self.placed == self.goal, we're done.
216         self.placed = set() # (peerid, shnum) tuples
217
218         # we also keep a mapping from peerid to RemoteReference. Each time we
219         # pull a connection out of the full peerlist, we add it to this for
220         # use later.
221         self.connections = {}
222
223         self.bad_share_checkstrings = {}
224
225         # we use the servermap to populate the initial goal: this way we will
226         # try to update each existing share in place.
227         for (peerid, shnum) in self._servermap.servermap:
228             self.goal.add( (peerid, shnum) )
229             self.connections[peerid] = self._servermap.connections[peerid]
230         # then we add in all the shares that were bad (corrupted, bad
231         # signatures, etc). We want to replace these.
232         for key, old_checkstring in self._servermap.bad_shares.items():
233             (peerid, shnum) = key
234             self.goal.add(key)
235             self.bad_share_checkstrings[key] = old_checkstring
236             self.connections[peerid] = self._servermap.connections[peerid]
237
238         # create the shares. We'll discard these as they are delivered. SDMF:
239         # we're allowed to hold everything in memory.
240
241         self._status.timings["setup"] = time.time() - self._started
242         d = self._encrypt_and_encode()
243         d.addCallback(self._generate_shares)
244         def _start_pushing(res):
245             self._started_pushing = time.time()
246             return res
247         d.addCallback(_start_pushing)
248         d.addCallback(self.loop) # trigger delivery
249         d.addErrback(self._fatal_error)
250
251         return self.done_deferred
252
253     def setup_encoding_parameters(self):
254         segment_size = len(self.newdata)
255         # this must be a multiple of self.required_shares
256         segment_size = mathutil.next_multiple(segment_size,
257                                               self.required_shares)
258         self.segment_size = segment_size
259         if segment_size:
260             self.num_segments = mathutil.div_ceil(len(self.newdata),
261                                                   segment_size)
262         else:
263             self.num_segments = 0
264         assert self.num_segments in [0, 1,] # SDMF restrictions
265
266     def _fatal_error(self, f):
267         self.log("error during loop", failure=f, level=log.UNUSUAL)
268         self._done(f)
269
270     def _update_status(self):
271         self._status.set_status("Sending Shares: %d placed out of %d, "
272                                 "%d messages outstanding" %
273                                 (len(self.placed),
274                                  len(self.goal),
275                                  len(self.outstanding)))
276         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
277
278     def loop(self, ignored=None):
279         self.log("entering loop", level=log.NOISY)
280         if not self._running:
281             return
282
283         self.looplimit -= 1
284         if self.looplimit <= 0:
285             raise LoopLimitExceededError("loop limit exceeded")
286
287         if self.surprised:
288             # don't send out any new shares, just wait for the outstanding
289             # ones to be retired.
290             self.log("currently surprised, so don't send any new shares",
291                      level=log.NOISY)
292         else:
293             self.update_goal()
294             # how far are we from our goal?
295             needed = self.goal - self.placed - self.outstanding
296             self._update_status()
297
298             if needed:
299                 # we need to send out new shares
300                 self.log(format="need to send %(needed)d new shares",
301                          needed=len(needed), level=log.NOISY)
302                 self._send_shares(needed)
303                 return
304
305         if self.outstanding:
306             # queries are still pending, keep waiting
307             self.log(format="%(outstanding)d queries still outstanding",
308                      outstanding=len(self.outstanding),
309                      level=log.NOISY)
310             return
311
312         # no queries outstanding, no placements needed: we're done
313         self.log("no queries outstanding, no placements needed: done",
314                  level=log.OPERATIONAL)
315         now = time.time()
316         elapsed = now - self._started_pushing
317         self._status.timings["push"] = elapsed
318         return self._done(None)
319
320     def log_goal(self, goal, message=""):
321         logmsg = [message]
322         for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
323             logmsg.append("sh%d to [%s]" % (shnum,
324                                             idlib.shortnodeid_b2a(peerid)))
325         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
326         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
327                  level=log.NOISY)
328
329     def update_goal(self):
330         # if log.recording_noisy
331         if True:
332             self.log_goal(self.goal, "before update: ")
333
334         # first, remove any bad peers from our goal
335         self.goal = set([ (peerid, shnum)
336                           for (peerid, shnum) in self.goal
337                           if peerid not in self.bad_peers ])
338
339         # find the homeless shares:
340         homefull_shares = set([shnum for (peerid, shnum) in self.goal])
341         homeless_shares = set(range(self.total_shares)) - homefull_shares
342         homeless_shares = sorted(list(homeless_shares))
343         # place them somewhere. We prefer unused servers at the beginning of
344         # the available peer list.
345
346         if not homeless_shares:
347             return
348
349         # if an old share X is on a node, put the new share X there too.
350         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
351         #       shares from existing peers to new (less-crowded) ones. The
352         #       old shares must still be updated.
353         # TODO: 2: move those shares instead of copying them, to reduce future
354         #       update work
355
356         # this is a bit CPU intensive but easy to analyze. We create a sort
357         # order for each peerid. If the peerid is marked as bad, we don't
358         # even put them in the list. Then we care about the number of shares
359         # which have already been assigned to them. After that we care about
360         # their permutation order.
361         old_assignments = DictOfSets()
362         for (peerid, shnum) in self.goal:
363             old_assignments.add(peerid, shnum)
364
365         peerlist = []
366         for i, (peerid, ss) in enumerate(self.full_peerlist):
367             if peerid in self.bad_peers:
368                 continue
369             entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
370             peerlist.append(entry)
371         peerlist.sort()
372
373         if not peerlist:
374             raise NotEnoughServersError("Ran out of non-bad servers, "
375                                         "first_error=%s" %
376                                         str(self._first_write_error),
377                                         self._first_write_error)
378
379         # we then index this peerlist with an integer, because we may have to
380         # wrap. We update the goal as we go.
381         i = 0
382         for shnum in homeless_shares:
383             (ignored1, ignored2, peerid, ss) = peerlist[i]
384             # if we are forced to send a share to a server that already has
385             # one, we may have two write requests in flight, and the
386             # servermap (which was computed before either request was sent)
387             # won't reflect the new shares, so the second response will be
388             # surprising. There is code in _got_write_answer() to tolerate
389             # this, otherwise it would cause the publish to fail with an
390             # UncoordinatedWriteError. See #546 for details of the trouble
391             # this used to cause.
392             self.goal.add( (peerid, shnum) )
393             self.connections[peerid] = ss
394             i += 1
395             if i >= len(peerlist):
396                 i = 0
397         if True:
398             self.log_goal(self.goal, "after update: ")
399
400
401
402     def _encrypt_and_encode(self):
403         # this returns a Deferred that fires with a list of (sharedata,
404         # sharenum) tuples. TODO: cache the ciphertext, only produce the
405         # shares that we care about.
406         self.log("_encrypt_and_encode")
407
408         self._status.set_status("Encrypting")
409         started = time.time()
410
411         key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
412         enc = AES(key)
413         crypttext = enc.process(self.newdata)
414         assert len(crypttext) == len(self.newdata)
415
416         now = time.time()
417         self._status.timings["encrypt"] = now - started
418         started = now
419
420         # now apply FEC
421
422         self._status.set_status("Encoding")
423         fec = codec.CRSEncoder()
424         fec.set_params(self.segment_size,
425                        self.required_shares, self.total_shares)
426         piece_size = fec.get_block_size()
427         crypttext_pieces = [None] * self.required_shares
428         for i in range(len(crypttext_pieces)):
429             offset = i * piece_size
430             piece = crypttext[offset:offset+piece_size]
431             piece = piece + "\x00"*(piece_size - len(piece)) # padding
432             crypttext_pieces[i] = piece
433             assert len(piece) == piece_size
434
435         d = fec.encode(crypttext_pieces)
436         def _done_encoding(res):
437             elapsed = time.time() - started
438             self._status.timings["encode"] = elapsed
439             return res
440         d.addCallback(_done_encoding)
441         return d
442
443     def _generate_shares(self, shares_and_shareids):
444         # this sets self.shares and self.root_hash
445         self.log("_generate_shares")
446         self._status.set_status("Generating Shares")
447         started = time.time()
448
449         # we should know these by now
450         privkey = self._privkey
451         encprivkey = self._encprivkey
452         pubkey = self._pubkey
453
454         (shares, share_ids) = shares_and_shareids
455
456         assert len(shares) == len(share_ids)
457         assert len(shares) == self.total_shares
458         all_shares = {}
459         block_hash_trees = {}
460         share_hash_leaves = [None] * len(shares)
461         for i in range(len(shares)):
462             share_data = shares[i]
463             shnum = share_ids[i]
464             all_shares[shnum] = share_data
465
466             # build the block hash tree. SDMF has only one leaf.
467             leaves = [hashutil.block_hash(share_data)]
468             t = hashtree.HashTree(leaves)
469             block_hash_trees[shnum] = list(t)
470             share_hash_leaves[shnum] = t[0]
471         for leaf in share_hash_leaves:
472             assert leaf is not None
473         share_hash_tree = hashtree.HashTree(share_hash_leaves)
474         share_hash_chain = {}
475         for shnum in range(self.total_shares):
476             needed_hashes = share_hash_tree.needed_hashes(shnum)
477             share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
478                                               for i in needed_hashes ] )
479         root_hash = share_hash_tree[0]
480         assert len(root_hash) == 32
481         self.log("my new root_hash is %s" % base32.b2a(root_hash))
482         self._new_version_info = (self._new_seqnum, root_hash, self.salt)
483
484         prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
485                              self.required_shares, self.total_shares,
486                              self.segment_size, len(self.newdata))
487
488         # now pack the beginning of the share. All shares are the same up
489         # to the signature, then they have divergent share hash chains,
490         # then completely different block hash trees + salt + share data,
491         # then they all share the same encprivkey at the end. The sizes
492         # of everything are the same for all shares.
493
494         sign_started = time.time()
495         signature = privkey.sign(prefix)
496         self._status.timings["sign"] = time.time() - sign_started
497
498         verification_key = pubkey.serialize()
499
500         final_shares = {}
501         for shnum in range(self.total_shares):
502             final_share = pack_share(prefix,
503                                      verification_key,
504                                      signature,
505                                      share_hash_chain[shnum],
506                                      block_hash_trees[shnum],
507                                      all_shares[shnum],
508                                      encprivkey)
509             final_shares[shnum] = final_share
510         elapsed = time.time() - started
511         self._status.timings["pack"] = elapsed
512         self.shares = final_shares
513         self.root_hash = root_hash
514
515         # we also need to build up the version identifier for what we're
516         # pushing. Extract the offsets from one of our shares.
517         assert final_shares
518         offsets = unpack_header(final_shares.values()[0])[-1]
519         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
520         verinfo = (self._new_seqnum, root_hash, self.salt,
521                    self.segment_size, len(self.newdata),
522                    self.required_shares, self.total_shares,
523                    prefix, offsets_tuple)
524         self.versioninfo = verinfo
525
526
527
528     def _send_shares(self, needed):
529         self.log("_send_shares")
530
531         # we're finally ready to send out our shares. If we encounter any
532         # surprises here, it's because somebody else is writing at the same
533         # time. (Note: in the future, when we remove the _query_peers() step
534         # and instead speculate about [or remember] which shares are where,
535         # surprises here are *not* indications of UncoordinatedWriteError,
536         # and we'll need to respond to them more gracefully.)
537
538         # needed is a set of (peerid, shnum) tuples. The first thing we do is
539         # organize it by peerid.
540
541         peermap = DictOfSets()
542         for (peerid, shnum) in needed:
543             peermap.add(peerid, shnum)
544
545         # the next thing is to build up a bunch of test vectors. The
546         # semantics of Publish are that we perform the operation if the world
547         # hasn't changed since the ServerMap was constructed (more or less).
548         # For every share we're trying to place, we create a test vector that
549         # tests to see if the server*share still corresponds to the
550         # map.
551
552         all_tw_vectors = {} # maps peerid to tw_vectors
553         sm = self._servermap.servermap
554
555         for key in needed:
556             (peerid, shnum) = key
557
558             if key in sm:
559                 # an old version of that share already exists on the
560                 # server, according to our servermap. We will create a
561                 # request that attempts to replace it.
562                 old_versionid, old_timestamp = sm[key]
563                 (old_seqnum, old_root_hash, old_salt, old_segsize,
564                  old_datalength, old_k, old_N, old_prefix,
565                  old_offsets_tuple) = old_versionid
566                 old_checkstring = pack_checkstring(old_seqnum,
567                                                    old_root_hash,
568                                                    old_salt)
569                 testv = (0, len(old_checkstring), "eq", old_checkstring)
570
571             elif key in self.bad_share_checkstrings:
572                 old_checkstring = self.bad_share_checkstrings[key]
573                 testv = (0, len(old_checkstring), "eq", old_checkstring)
574
575             else:
576                 # add a testv that requires the share not exist
577
578                 # Unfortunately, foolscap-0.2.5 has a bug in the way inbound
579                 # constraints are handled. If the same object is referenced
580                 # multiple times inside the arguments, foolscap emits a
581                 # 'reference' token instead of a distinct copy of the
582                 # argument. The bug is that these 'reference' tokens are not
583                 # accepted by the inbound constraint code. To work around
584                 # this, we need to prevent python from interning the
585                 # (constant) tuple, by creating a new copy of this vector
586                 # each time.
587
588                 # This bug is fixed in foolscap-0.2.6, and even though this
589                 # version of Tahoe requires foolscap-0.3.1 or newer, we are
590                 # supposed to be able to interoperate with older versions of
591                 # Tahoe which are allowed to use older versions of foolscap,
592                 # including foolscap-0.2.5 . In addition, I've seen other
593                 # foolscap problems triggered by 'reference' tokens (see #541
594                 # for details). So we must keep this workaround in place.
595
596                 #testv = (0, 1, 'eq', "")
597                 testv = tuple([0, 1, 'eq', ""])
598
599             testvs = [testv]
600             # the write vector is simply the share
601             writev = [(0, self.shares[shnum])]
602
603             if peerid not in all_tw_vectors:
604                 all_tw_vectors[peerid] = {}
605                 # maps shnum to (testvs, writevs, new_length)
606             assert shnum not in all_tw_vectors[peerid]
607
608             all_tw_vectors[peerid][shnum] = (testvs, writev, None)
609
610         # we read the checkstring back from each share, however we only use
611         # it to detect whether there was a new share that we didn't know
612         # about. The success or failure of the write will tell us whether
613         # there was a collision or not. If there is a collision, the first
614         # thing we'll do is update the servermap, which will find out what
615         # happened. We could conceivably reduce a roundtrip by using the
616         # readv checkstring to populate the servermap, but really we'd have
617         # to read enough data to validate the signatures too, so it wouldn't
618         # be an overall win.
619         read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
620
621         # ok, send the messages!
622         self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
623         started = time.time()
624         for (peerid, tw_vectors) in all_tw_vectors.items():
625
626             write_enabler = self._node.get_write_enabler(peerid)
627             renew_secret = self._node.get_renewal_secret(peerid)
628             cancel_secret = self._node.get_cancel_secret(peerid)
629             secrets = (write_enabler, renew_secret, cancel_secret)
630             shnums = tw_vectors.keys()
631
632             for shnum in shnums:
633                 self.outstanding.add( (peerid, shnum) )
634
635             d = self._do_testreadwrite(peerid, secrets,
636                                        tw_vectors, read_vector)
637             d.addCallbacks(self._got_write_answer, self._got_write_error,
638                            callbackArgs=(peerid, shnums, started),
639                            errbackArgs=(peerid, shnums, started))
640             # tolerate immediate errback, like with DeadReferenceError
641             d.addBoth(fireEventually)
642             d.addCallback(self.loop)
643             d.addErrback(self._fatal_error)
644
645         self._update_status()
646         self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
647
648     def _do_testreadwrite(self, peerid, secrets,
649                           tw_vectors, read_vector):
650         storage_index = self._storage_index
651         ss = self.connections[peerid]
652
653         #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
654         d = ss.callRemote("slot_testv_and_readv_and_writev",
655                           storage_index,
656                           secrets,
657                           tw_vectors,
658                           read_vector)
659         return d
660
661     def _got_write_answer(self, answer, peerid, shnums, started):
662         lp = self.log("_got_write_answer from %s" %
663                       idlib.shortnodeid_b2a(peerid))
664         for shnum in shnums:
665             self.outstanding.discard( (peerid, shnum) )
666
667         now = time.time()
668         elapsed = now - started
669         self._status.add_per_server_time(peerid, elapsed)
670
671         wrote, read_data = answer
672
673         surprise_shares = set(read_data.keys()) - set(shnums)
674
675         surprised = False
676         for shnum in surprise_shares:
677             # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
678             checkstring = read_data[shnum][0]
679             their_version_info = unpack_checkstring(checkstring)
680             if their_version_info == self._new_version_info:
681                 # they have the right share, somehow
682
683                 if (peerid,shnum) in self.goal:
684                     # and we want them to have it, so we probably sent them a
685                     # copy in an earlier write. This is ok, and avoids the
686                     # #546 problem.
687                     continue
688
689                 # They aren't in our goal, but they are still for the right
690                 # version. Somebody else wrote them, and it's a convergent
691                 # uncoordinated write. Pretend this is ok (don't be
692                 # surprised), since I suspect there's a decent chance that
693                 # we'll hit this in normal operation.
694                 continue
695
696             else:
697                 # the new shares are of a different version
698                 if peerid in self._servermap.reachable_peers:
699                     # we asked them about their shares, so we had knowledge
700                     # of what they used to have. Any surprising shares must
701                     # have come from someone else, so UCW.
702                     surprised = True
703                 else:
704                     # we didn't ask them, and now we've discovered that they
705                     # have a share we didn't know about. This indicates that
706                     # mapupdate should have wokred harder and asked more
707                     # servers before concluding that it knew about them all.
708
709                     # signal UCW, but make sure to ask this peer next time,
710                     # so we'll remember to update it if/when we retry.
711                     surprised = True
712                     # TODO: ask this peer next time. I don't yet have a good
713                     # way to do this. Two insufficient possibilities are:
714                     #
715                     # self._servermap.add_new_share(peerid, shnum, verinfo, now)
716                     #  but that requires fetching/validating/parsing the whole
717                     #  version string, and all we have is the checkstring
718                     # self._servermap.mark_bad_share(peerid, shnum, checkstring)
719                     #  that will make publish overwrite the share next time,
720                     #  but it won't re-query the server, and it won't make
721                     #  mapupdate search further
722
723                     # TODO later: when publish starts, do
724                     # servermap.get_best_version(), extract the seqnum,
725                     # subtract one, and store as highest-replaceable-seqnum.
726                     # Then, if this surprise-because-we-didn't-ask share is
727                     # of highest-replaceable-seqnum or lower, we're allowed
728                     # to replace it: send out a new writev (or rather add it
729                     # to self.goal and loop).
730                     pass
731
732                 surprised = True
733
734         if surprised:
735             self.log("they had shares %s that we didn't know about" %
736                      (list(surprise_shares),),
737                      parent=lp, level=log.WEIRD, umid="un9CSQ")
738             self.surprised = True
739
740         if not wrote:
741             # TODO: there are two possibilities. The first is that the server
742             # is full (or just doesn't want to give us any room), which means
743             # we shouldn't ask them again, but is *not* an indication of an
744             # uncoordinated write. The second is that our testv failed, which
745             # *does* indicate an uncoordinated write. We currently don't have
746             # a way to tell these two apart (in fact, the storage server code
747             # doesn't have the option of refusing our share).
748             #
749             # If the server is full, mark the peer as bad (so we don't ask
750             # them again), but don't set self.surprised. The loop() will find
751             # a new server.
752             #
753             # If the testv failed, log it, set self.surprised, but don't
754             # bother adding to self.bad_peers .
755
756             self.log("our testv failed, so the write did not happen",
757                      parent=lp, level=log.WEIRD, umid="8sc26g")
758             self.surprised = True
759             self.bad_peers.add(peerid) # don't ask them again
760             # use the checkstring to add information to the log message
761             for (shnum,readv) in read_data.items():
762                 checkstring = readv[0]
763                 (other_seqnum,
764                  other_roothash,
765                  other_salt) = unpack_checkstring(checkstring)
766                 expected_version = self._servermap.version_on_peer(peerid,
767                                                                    shnum)
768                 if expected_version:
769                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
770                      offsets_tuple) = expected_version
771                     self.log("somebody modified the share on us:"
772                              " shnum=%d: I thought they had #%d:R=%s,"
773                              " but testv reported #%d:R=%s" %
774                              (shnum,
775                               seqnum, base32.b2a(root_hash)[:4],
776                               other_seqnum, base32.b2a(other_roothash)[:4]),
777                              parent=lp, level=log.NOISY)
778                 # if expected_version==None, then we didn't expect to see a
779                 # share on that peer, and the 'surprise_shares' clause above
780                 # will have logged it.
781             # self.loop() will take care of finding new homes
782             return
783
784         for shnum in shnums:
785             self.placed.add( (peerid, shnum) )
786             # and update the servermap
787             self._servermap.add_new_share(peerid, shnum,
788                                           self.versioninfo, started)
789
790         # self.loop() will take care of checking to see if we're done
791         return
792
793     def _got_write_error(self, f, peerid, shnums, started):
794         for shnum in shnums:
795             self.outstanding.discard( (peerid, shnum) )
796         self.bad_peers.add(peerid)
797         if self._first_write_error is None:
798             self._first_write_error = f
799         self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
800                  shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
801                  failure=f,
802                  level=log.UNUSUAL)
803         # self.loop() will take care of checking to see if we're done
804         return
805
806
807     def _done(self, res):
808         if not self._running:
809             return
810         self._running = False
811         now = time.time()
812         self._status.timings["total"] = now - self._started
813         self._status.set_active(False)
814         if isinstance(res, failure.Failure):
815             self.log("Publish done, with failure", failure=res,
816                      level=log.WEIRD, umid="nRsR9Q")
817             self._status.set_status("Failed")
818         elif self.surprised:
819             self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
820             self._status.set_status("UncoordinatedWriteError")
821             # deliver a failure
822             res = failure.Failure(UncoordinatedWriteError())
823             # TODO: recovery
824         else:
825             self.log("Publish done, success")
826             self._status.set_status("Finished")
827             self._status.set_progress(1.0)
828         eventually(self.done_deferred.callback, res)
829
830