]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
mutable publish: if we are surprised by shares that match what we would have written...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, struct, time
4 from itertools import count
5 from zope.interface import implements
6 from twisted.internet import defer
7 from twisted.python import failure
8 from allmydata.interfaces import IPublishStatus, FileTooLargeError
9 from allmydata.util import base32, hashutil, mathutil, idlib, log
10 from allmydata import hashtree, codec, storage
11 from pycryptopp.cipher.aes import AES
12 from foolscap.eventual import eventually
13
14 from common import MODE_WRITE, MODE_CHECK, DictOfSets, \
15      UncoordinatedWriteError, NotEnoughServersError
16 from servermap import ServerMap
17 from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
18      unpack_checkstring, SIGNED_PREFIX
19
20 class PublishStatus:
21     implements(IPublishStatus)
22     statusid_counter = count(0)
23     def __init__(self):
24         self.timings = {}
25         self.timings["send_per_server"] = {}
26         self.servermap = None
27         self.problems = {}
28         self.active = True
29         self.storage_index = None
30         self.helper = False
31         self.encoding = ("?", "?")
32         self.size = None
33         self.status = "Not started"
34         self.progress = 0.0
35         self.counter = self.statusid_counter.next()
36         self.started = time.time()
37
38     def add_per_server_time(self, peerid, elapsed):
39         if peerid not in self.timings["send_per_server"]:
40             self.timings["send_per_server"][peerid] = []
41         self.timings["send_per_server"][peerid].append(elapsed)
42
43     def get_started(self):
44         return self.started
45     def get_storage_index(self):
46         return self.storage_index
47     def get_encoding(self):
48         return self.encoding
49     def using_helper(self):
50         return self.helper
51     def get_servermap(self):
52         return self.servermap
53     def get_size(self):
54         return self.size
55     def get_status(self):
56         return self.status
57     def get_progress(self):
58         return self.progress
59     def get_active(self):
60         return self.active
61     def get_counter(self):
62         return self.counter
63
64     def set_storage_index(self, si):
65         self.storage_index = si
66     def set_helper(self, helper):
67         self.helper = helper
68     def set_servermap(self, servermap):
69         self.servermap = servermap
70     def set_encoding(self, k, n):
71         self.encoding = (k, n)
72     def set_size(self, size):
73         self.size = size
74     def set_status(self, status):
75         self.status = status
76     def set_progress(self, value):
77         self.progress = value
78     def set_active(self, value):
79         self.active = value
80
81 class Publish:
82     """I represent a single act of publishing the mutable file to the grid. I
83     will only publish my data if the servermap I am using still represents
84     the current state of the world.
85
86     To make the initial publish, set servermap to None.
87     """
88
89     # we limit the segment size as usual to constrain our memory footprint.
90     # The max segsize is higher for mutable files, because we want to support
91     # dirnodes with up to 10k children, and each child uses about 330 bytes.
92     # If you actually put that much into a directory you'll be using a
93     # footprint of around 14MB, which is higher than we'd like, but it is
94     # more important right now to support large directories than to make
95     # memory usage small when you use them. Once we implement MDMF (with
96     # multiple segments), we will drop this back down, probably to 128KiB.
97     MAX_SEGMENT_SIZE = 3500000
98
99     def __init__(self, filenode, servermap):
100         self._node = filenode
101         self._servermap = servermap
102         self._storage_index = self._node.get_storage_index()
103         self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
104         num = self._node._client.log("Publish(%s): starting" % prefix)
105         self._log_number = num
106         self._running = True
107
108         self._status = PublishStatus()
109         self._status.set_storage_index(self._storage_index)
110         self._status.set_helper(False)
111         self._status.set_progress(0.0)
112         self._status.set_active(True)
113
114     def get_status(self):
115         return self._status
116
117     def log(self, *args, **kwargs):
118         if 'parent' not in kwargs:
119             kwargs['parent'] = self._log_number
120         if "facility" not in kwargs:
121             kwargs["facility"] = "tahoe.mutable.publish"
122         return log.msg(*args, **kwargs)
123
124     def publish(self, newdata):
125         """Publish the filenode's current contents.  Returns a Deferred that
126         fires (with None) when the publish has done as much work as it's ever
127         going to do, or errbacks with ConsistencyError if it detects a
128         simultaneous write.
129         """
130
131         # 1: generate shares (SDMF: files are small, so we can do it in RAM)
132         # 2: perform peer selection, get candidate servers
133         #  2a: send queries to n+epsilon servers, to determine current shares
134         #  2b: based upon responses, create target map
135         # 3: send slot_testv_and_readv_and_writev messages
136         # 4: as responses return, update share-dispatch table
137         # 4a: may need to run recovery algorithm
138         # 5: when enough responses are back, we're done
139
140         self.log("starting publish, datalen is %s" % len(newdata))
141         if len(newdata) > self.MAX_SEGMENT_SIZE:
142             raise FileTooLargeError("SDMF is limited to one segment, and "
143                                     "%d > %d" % (len(newdata),
144                                                  self.MAX_SEGMENT_SIZE))
145         self._status.set_size(len(newdata))
146         self._status.set_status("Started")
147         self._started = time.time()
148
149         self.done_deferred = defer.Deferred()
150
151         self._writekey = self._node.get_writekey()
152         assert self._writekey, "need write capability to publish"
153
154         # first, which servers will we publish to? We require that the
155         # servermap was updated in MODE_WRITE, so we can depend upon the
156         # peerlist computed by that process instead of computing our own.
157         if self._servermap:
158             assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
159             # we will push a version that is one larger than anything present
160             # in the grid, according to the servermap.
161             self._new_seqnum = self._servermap.highest_seqnum() + 1
162         else:
163             # If we don't have a servermap, that's because we're doing the
164             # initial publish
165             self._new_seqnum = 1
166             self._servermap = ServerMap()
167         self._status.set_servermap(self._servermap)
168
169         self.log(format="new seqnum will be %(seqnum)d",
170                  seqnum=self._new_seqnum, level=log.NOISY)
171
172         # having an up-to-date servermap (or using a filenode that was just
173         # created for the first time) also guarantees that the following
174         # fields are available
175         self.readkey = self._node.get_readkey()
176         self.required_shares = self._node.get_required_shares()
177         assert self.required_shares is not None
178         self.total_shares = self._node.get_total_shares()
179         assert self.total_shares is not None
180         self._status.set_encoding(self.required_shares, self.total_shares)
181
182         self._pubkey = self._node.get_pubkey()
183         assert self._pubkey
184         self._privkey = self._node.get_privkey()
185         assert self._privkey
186         self._encprivkey = self._node.get_encprivkey()
187
188         client = self._node._client
189         full_peerlist = client.get_permuted_peers("storage",
190                                                   self._storage_index)
191         self.full_peerlist = full_peerlist # for use later, immutable
192         self.bad_peers = set() # peerids who have errbacked/refused requests
193
194         self.newdata = newdata
195         self.salt = os.urandom(16)
196
197         self.setup_encoding_parameters()
198
199         # if we experience any surprises (writes which were rejected because
200         # our test vector did not match, or shares which we didn't expect to
201         # see), we set this flag and report an UncoordinatedWriteError at the
202         # end of the publish process.
203         self.surprised = False
204
205         # as a failsafe, refuse to iterate through self.loop more than a
206         # thousand times.
207         self.looplimit = 1000
208
209         # we keep track of three tables. The first is our goal: which share
210         # we want to see on which servers. This is initially populated by the
211         # existing servermap.
212         self.goal = set() # pairs of (peerid, shnum) tuples
213
214         # the second table is our list of outstanding queries: those which
215         # are in flight and may or may not be delivered, accepted, or
216         # acknowledged. Items are added to this table when the request is
217         # sent, and removed when the response returns (or errbacks).
218         self.outstanding = set() # (peerid, shnum) tuples
219
220         # the third is a table of successes: share which have actually been
221         # placed. These are populated when responses come back with success.
222         # When self.placed == self.goal, we're done.
223         self.placed = set() # (peerid, shnum) tuples
224
225         # we also keep a mapping from peerid to RemoteReference. Each time we
226         # pull a connection out of the full peerlist, we add it to this for
227         # use later.
228         self.connections = {}
229
230         self.bad_share_checkstrings = {}
231
232         # we use the servermap to populate the initial goal: this way we will
233         # try to update each existing share in place.
234         for (peerid, shnum) in self._servermap.servermap:
235             self.goal.add( (peerid, shnum) )
236             self.connections[peerid] = self._servermap.connections[peerid]
237         # then we add in all the shares that were bad (corrupted, bad
238         # signatures, etc). We want to replace these.
239         for key, old_checkstring in self._servermap.bad_shares.items():
240             (peerid, shnum) = key
241             self.goal.add(key)
242             self.bad_share_checkstrings[key] = old_checkstring
243             self.connections[peerid] = self._servermap.connections[peerid]
244
245         # create the shares. We'll discard these as they are delivered. SDMF:
246         # we're allowed to hold everything in memory.
247
248         self._status.timings["setup"] = time.time() - self._started
249         d = self._encrypt_and_encode()
250         d.addCallback(self._generate_shares)
251         def _start_pushing(res):
252             self._started_pushing = time.time()
253             return res
254         d.addCallback(_start_pushing)
255         d.addCallback(self.loop) # trigger delivery
256         d.addErrback(self._fatal_error)
257
258         return self.done_deferred
259
260     def setup_encoding_parameters(self):
261         segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
262         # this must be a multiple of self.required_shares
263         segment_size = mathutil.next_multiple(segment_size,
264                                               self.required_shares)
265         self.segment_size = segment_size
266         if segment_size:
267             self.num_segments = mathutil.div_ceil(len(self.newdata),
268                                                   segment_size)
269         else:
270             self.num_segments = 0
271         assert self.num_segments in [0, 1,] # SDMF restrictions
272
273     def _fatal_error(self, f):
274         self.log("error during loop", failure=f, level=log.UNUSUAL)
275         self._done(f)
276
277     def _update_status(self):
278         self._status.set_status("Sending Shares: %d placed out of %d, "
279                                 "%d messages outstanding" %
280                                 (len(self.placed),
281                                  len(self.goal),
282                                  len(self.outstanding)))
283         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
284
285     def loop(self, ignored=None):
286         self.log("entering loop", level=log.NOISY)
287         if not self._running:
288             return
289
290         self.looplimit -= 1
291         if self.looplimit <= 0:
292             raise RuntimeError("loop limit exceeded")
293
294         if self.surprised:
295             # don't send out any new shares, just wait for the outstanding
296             # ones to be retired.
297             self.log("currently surprised, so don't send any new shares",
298                      level=log.NOISY)
299         else:
300             self.update_goal()
301             # how far are we from our goal?
302             needed = self.goal - self.placed - self.outstanding
303             self._update_status()
304
305             if needed:
306                 # we need to send out new shares
307                 self.log(format="need to send %(needed)d new shares",
308                          needed=len(needed), level=log.NOISY)
309                 self._send_shares(needed)
310                 return
311
312         if self.outstanding:
313             # queries are still pending, keep waiting
314             self.log(format="%(outstanding)d queries still outstanding",
315                      outstanding=len(self.outstanding),
316                      level=log.NOISY)
317             return
318
319         # no queries outstanding, no placements needed: we're done
320         self.log("no queries outstanding, no placements needed: done",
321                  level=log.OPERATIONAL)
322         now = time.time()
323         elapsed = now - self._started_pushing
324         self._status.timings["push"] = elapsed
325         return self._done(None)
326
327     def log_goal(self, goal, message=""):
328         logmsg = [message]
329         for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
330             logmsg.append("sh%d to [%s]" % (shnum,
331                                             idlib.shortnodeid_b2a(peerid)))
332         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
333         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
334                  level=log.NOISY)
335
336     def update_goal(self):
337         # if log.recording_noisy
338         if True:
339             self.log_goal(self.goal, "before update: ")
340
341         # first, remove any bad peers from our goal
342         self.goal = set([ (peerid, shnum)
343                           for (peerid, shnum) in self.goal
344                           if peerid not in self.bad_peers ])
345
346         # find the homeless shares:
347         homefull_shares = set([shnum for (peerid, shnum) in self.goal])
348         homeless_shares = set(range(self.total_shares)) - homefull_shares
349         homeless_shares = sorted(list(homeless_shares))
350         # place them somewhere. We prefer unused servers at the beginning of
351         # the available peer list.
352
353         if not homeless_shares:
354             return
355
356         # if an old share X is on a node, put the new share X there too.
357         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
358         #       shares from existing peers to new (less-crowded) ones. The
359         #       old shares must still be updated.
360         # TODO: 2: move those shares instead of copying them, to reduce future
361         #       update work
362
363         # this is a bit CPU intensive but easy to analyze. We create a sort
364         # order for each peerid. If the peerid is marked as bad, we don't
365         # even put them in the list. Then we care about the number of shares
366         # which have already been assigned to them. After that we care about
367         # their permutation order.
368         old_assignments = DictOfSets()
369         for (peerid, shnum) in self.goal:
370             old_assignments.add(peerid, shnum)
371
372         peerlist = []
373         for i, (peerid, ss) in enumerate(self.full_peerlist):
374             if peerid in self.bad_peers:
375                 continue
376             entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
377             peerlist.append(entry)
378         peerlist.sort()
379
380         if not peerlist:
381             raise NotEnoughServersError("Ran out of non-bad servers")
382
383         new_assignments = []
384         # we then index this peerlist with an integer, because we may have to
385         # wrap. We update the goal as we go.
386         i = 0
387         for shnum in homeless_shares:
388             (ignored1, ignored2, peerid, ss) = peerlist[i]
389             # if we are forced to send a share to a server that already has
390             # one, we may have two write requests in flight, and the
391             # servermap (which was computed before either request was sent)
392             # won't reflect the new shares, so the second response will be
393             # surprising. There is code in _got_write_answer() to tolerate
394             # this, otherwise it would cause the publish to fail with an
395             # UncoordinatedWriteError. See #546 for details of the trouble
396             # this used to cause.
397             self.goal.add( (peerid, shnum) )
398             self.connections[peerid] = ss
399             i += 1
400             if i >= len(peerlist):
401                 i = 0
402         if True:
403             self.log_goal(self.goal, "after update: ")
404
405
406
407     def _encrypt_and_encode(self):
408         # this returns a Deferred that fires with a list of (sharedata,
409         # sharenum) tuples. TODO: cache the ciphertext, only produce the
410         # shares that we care about.
411         self.log("_encrypt_and_encode")
412
413         self._status.set_status("Encrypting")
414         started = time.time()
415
416         key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
417         enc = AES(key)
418         crypttext = enc.process(self.newdata)
419         assert len(crypttext) == len(self.newdata)
420
421         now = time.time()
422         self._status.timings["encrypt"] = now - started
423         started = now
424
425         # now apply FEC
426
427         self._status.set_status("Encoding")
428         fec = codec.CRSEncoder()
429         fec.set_params(self.segment_size,
430                        self.required_shares, self.total_shares)
431         piece_size = fec.get_block_size()
432         crypttext_pieces = [None] * self.required_shares
433         for i in range(len(crypttext_pieces)):
434             offset = i * piece_size
435             piece = crypttext[offset:offset+piece_size]
436             piece = piece + "\x00"*(piece_size - len(piece)) # padding
437             crypttext_pieces[i] = piece
438             assert len(piece) == piece_size
439
440         d = fec.encode(crypttext_pieces)
441         def _done_encoding(res):
442             elapsed = time.time() - started
443             self._status.timings["encode"] = elapsed
444             return res
445         d.addCallback(_done_encoding)
446         return d
447
448     def _generate_shares(self, shares_and_shareids):
449         # this sets self.shares and self.root_hash
450         self.log("_generate_shares")
451         self._status.set_status("Generating Shares")
452         started = time.time()
453
454         # we should know these by now
455         privkey = self._privkey
456         encprivkey = self._encprivkey
457         pubkey = self._pubkey
458
459         (shares, share_ids) = shares_and_shareids
460
461         assert len(shares) == len(share_ids)
462         assert len(shares) == self.total_shares
463         all_shares = {}
464         block_hash_trees = {}
465         share_hash_leaves = [None] * len(shares)
466         for i in range(len(shares)):
467             share_data = shares[i]
468             shnum = share_ids[i]
469             all_shares[shnum] = share_data
470
471             # build the block hash tree. SDMF has only one leaf.
472             leaves = [hashutil.block_hash(share_data)]
473             t = hashtree.HashTree(leaves)
474             block_hash_trees[shnum] = block_hash_tree = list(t)
475             share_hash_leaves[shnum] = t[0]
476         for leaf in share_hash_leaves:
477             assert leaf is not None
478         share_hash_tree = hashtree.HashTree(share_hash_leaves)
479         share_hash_chain = {}
480         for shnum in range(self.total_shares):
481             needed_hashes = share_hash_tree.needed_hashes(shnum)
482             share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
483                                               for i in needed_hashes ] )
484         root_hash = share_hash_tree[0]
485         assert len(root_hash) == 32
486         self.log("my new root_hash is %s" % base32.b2a(root_hash))
487         self._new_version_info = (self._new_seqnum, root_hash, self.salt)
488
489         prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
490                              self.required_shares, self.total_shares,
491                              self.segment_size, len(self.newdata))
492
493         # now pack the beginning of the share. All shares are the same up
494         # to the signature, then they have divergent share hash chains,
495         # then completely different block hash trees + salt + share data,
496         # then they all share the same encprivkey at the end. The sizes
497         # of everything are the same for all shares.
498
499         sign_started = time.time()
500         signature = privkey.sign(prefix)
501         self._status.timings["sign"] = time.time() - sign_started
502
503         verification_key = pubkey.serialize()
504
505         final_shares = {}
506         for shnum in range(self.total_shares):
507             final_share = pack_share(prefix,
508                                      verification_key,
509                                      signature,
510                                      share_hash_chain[shnum],
511                                      block_hash_trees[shnum],
512                                      all_shares[shnum],
513                                      encprivkey)
514             final_shares[shnum] = final_share
515         elapsed = time.time() - started
516         self._status.timings["pack"] = elapsed
517         self.shares = final_shares
518         self.root_hash = root_hash
519
520         # we also need to build up the version identifier for what we're
521         # pushing. Extract the offsets from one of our shares.
522         assert final_shares
523         offsets = unpack_header(final_shares.values()[0])[-1]
524         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
525         verinfo = (self._new_seqnum, root_hash, self.salt,
526                    self.segment_size, len(self.newdata),
527                    self.required_shares, self.total_shares,
528                    prefix, offsets_tuple)
529         self.versioninfo = verinfo
530
531
532
533     def _send_shares(self, needed):
534         self.log("_send_shares")
535
536         # we're finally ready to send out our shares. If we encounter any
537         # surprises here, it's because somebody else is writing at the same
538         # time. (Note: in the future, when we remove the _query_peers() step
539         # and instead speculate about [or remember] which shares are where,
540         # surprises here are *not* indications of UncoordinatedWriteError,
541         # and we'll need to respond to them more gracefully.)
542
543         # needed is a set of (peerid, shnum) tuples. The first thing we do is
544         # organize it by peerid.
545
546         peermap = DictOfSets()
547         for (peerid, shnum) in needed:
548             peermap.add(peerid, shnum)
549
550         # the next thing is to build up a bunch of test vectors. The
551         # semantics of Publish are that we perform the operation if the world
552         # hasn't changed since the ServerMap was constructed (more or less).
553         # For every share we're trying to place, we create a test vector that
554         # tests to see if the server*share still corresponds to the
555         # map.
556
557         all_tw_vectors = {} # maps peerid to tw_vectors
558         sm = self._servermap.servermap
559
560         for key in needed:
561             (peerid, shnum) = key
562
563             if key in sm:
564                 # an old version of that share already exists on the
565                 # server, according to our servermap. We will create a
566                 # request that attempts to replace it.
567                 old_versionid, old_timestamp = sm[key]
568                 (old_seqnum, old_root_hash, old_salt, old_segsize,
569                  old_datalength, old_k, old_N, old_prefix,
570                  old_offsets_tuple) = old_versionid
571                 old_checkstring = pack_checkstring(old_seqnum,
572                                                    old_root_hash,
573                                                    old_salt)
574                 testv = (0, len(old_checkstring), "eq", old_checkstring)
575
576             elif key in self.bad_share_checkstrings:
577                 old_checkstring = self.bad_share_checkstrings[key]
578                 testv = (0, len(old_checkstring), "eq", old_checkstring)
579
580             else:
581                 # add a testv that requires the share not exist
582
583                 # Unfortunately, foolscap-0.2.5 has a bug in the way inbound
584                 # constraints are handled. If the same object is referenced
585                 # multiple times inside the arguments, foolscap emits a
586                 # 'reference' token instead of a distinct copy of the
587                 # argument. The bug is that these 'reference' tokens are not
588                 # accepted by the inbound constraint code. To work around
589                 # this, we need to prevent python from interning the
590                 # (constant) tuple, by creating a new copy of this vector
591                 # each time.
592
593                 # This bug is fixed in foolscap-0.2.6, and even though this
594                 # version of Tahoe requires foolscap-0.3.1 or newer, we are
595                 # supposed to be able to interoperate with older versions of
596                 # Tahoe which are allowed to use older versions of foolscap,
597                 # including foolscap-0.2.5 . In addition, I've seen other
598                 # foolscap problems triggered by 'reference' tokens (see #541
599                 # for details). So we must keep this workaround in place.
600
601                 #testv = (0, 1, 'eq', "")
602                 testv = tuple([0, 1, 'eq', ""])
603
604             testvs = [testv]
605             # the write vector is simply the share
606             writev = [(0, self.shares[shnum])]
607
608             if peerid not in all_tw_vectors:
609                 all_tw_vectors[peerid] = {}
610                 # maps shnum to (testvs, writevs, new_length)
611             assert shnum not in all_tw_vectors[peerid]
612
613             all_tw_vectors[peerid][shnum] = (testvs, writev, None)
614
615         # we read the checkstring back from each share, however we only use
616         # it to detect whether there was a new share that we didn't know
617         # about. The success or failure of the write will tell us whether
618         # there was a collision or not. If there is a collision, the first
619         # thing we'll do is update the servermap, which will find out what
620         # happened. We could conceivably reduce a roundtrip by using the
621         # readv checkstring to populate the servermap, but really we'd have
622         # to read enough data to validate the signatures too, so it wouldn't
623         # be an overall win.
624         read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
625
626         # ok, send the messages!
627         self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
628         started = time.time()
629         for (peerid, tw_vectors) in all_tw_vectors.items():
630
631             write_enabler = self._node.get_write_enabler(peerid)
632             renew_secret = self._node.get_renewal_secret(peerid)
633             cancel_secret = self._node.get_cancel_secret(peerid)
634             secrets = (write_enabler, renew_secret, cancel_secret)
635             shnums = tw_vectors.keys()
636
637             for shnum in shnums:
638                 self.outstanding.add( (peerid, shnum) )
639
640             d = self._do_testreadwrite(peerid, secrets,
641                                        tw_vectors, read_vector)
642             d.addCallbacks(self._got_write_answer, self._got_write_error,
643                            callbackArgs=(peerid, shnums, started),
644                            errbackArgs=(peerid, shnums, started))
645             d.addCallback(self.loop)
646             d.addErrback(self._fatal_error)
647
648         self._update_status()
649         self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
650
651     def _do_testreadwrite(self, peerid, secrets,
652                           tw_vectors, read_vector):
653         storage_index = self._storage_index
654         ss = self.connections[peerid]
655
656         #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
657         d = ss.callRemote("slot_testv_and_readv_and_writev",
658                           storage_index,
659                           secrets,
660                           tw_vectors,
661                           read_vector)
662         return d
663
664     def _got_write_answer(self, answer, peerid, shnums, started):
665         lp = self.log("_got_write_answer from %s" %
666                       idlib.shortnodeid_b2a(peerid))
667         for shnum in shnums:
668             self.outstanding.discard( (peerid, shnum) )
669
670         now = time.time()
671         elapsed = now - started
672         self._status.add_per_server_time(peerid, elapsed)
673
674         wrote, read_data = answer
675
676         surprise_shares = set(read_data.keys()) - set(shnums)
677
678         surprised = False
679         for shnum in surprise_shares:
680             # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
681             checkstring = read_data[shnum][0]
682             their_version_info = unpack_checkstring(checkstring)
683             if their_version_info == self._new_version_info:
684                 # they have the right share, somehow
685
686                 if (peerid,shnum) in self.goal:
687                     # and we want them to have it, so we probably sent them a
688                     # copy in an earlier write. This is ok, and avoids the
689                     # #546 problem.
690                     continue
691
692                 # They aren't in our goal, but they are still for the right
693                 # version. Somebody else wrote them, and it's a convergent
694                 # uncoordinated write. Pretend this is ok (don't be
695                 # surprised), since I suspect there's a decent chance that
696                 # we'll hit this in normal operation.
697                 continue
698
699             else:
700                 # the new shares are of a different version
701                 if peerid in self._servermap.reachable_peers:
702                     # we asked them about their shares, so we had knowledge
703                     # of what they used to have. Any surprising shares must
704                     # have come from someone else, so UCW.
705                     surprised = True
706                 else:
707                     # we didn't ask them, and now we've discovered that they
708                     # have a share we didn't know about. This indicates that
709                     # mapupdate should have wokred harder and asked more
710                     # servers before concluding that it knew about them all.
711
712                     # signal UCW, but make sure to ask this peer next time,
713                     # so we'll remember to update it if/when we retry.
714                     surprised = True
715                     # TODO: ask this peer next time. I don't yet have a good
716                     # way to do this. Two insufficient possibilities are:
717                     #
718                     # self._servermap.add_new_share(peerid, shnum, verinfo, now)
719                     #  but that requires fetching/validating/parsing the whole
720                     #  version string, and all we have is the checkstring
721                     # self._servermap.mark_bad_share(peerid, shnum, checkstring)
722                     #  that will make publish overwrite the share next time,
723                     #  but it won't re-query the server, and it won't make
724                     #  mapupdate search further
725
726                     # TODO later: when publish starts, do
727                     # servermap.get_best_version(), extract the seqnum,
728                     # subtract one, and store as highest-replaceable-seqnum.
729                     # Then, if this surprise-because-we-didn't-ask share is
730                     # of highest-replaceable-seqnum or lower, we're allowed
731                     # to replace it: send out a new writev (or rather add it
732                     # to self.goal and loop).
733                     pass
734
735                 surprised = True
736
737         if surprised:
738             self.log("they had shares %s that we didn't know about" %
739                      (list(surprise_shares),),
740                      parent=lp, level=log.WEIRD, umid="un9CSQ")
741             self.surprised = True
742
743         if not wrote:
744             # TODO: there are two possibilities. The first is that the server
745             # is full (or just doesn't want to give us any room), which means
746             # we shouldn't ask them again, but is *not* an indication of an
747             # uncoordinated write. The second is that our testv failed, which
748             # *does* indicate an uncoordinated write. We currently don't have
749             # a way to tell these two apart (in fact, the storage server code
750             # doesn't have the option of refusing our share).
751             #
752             # If the server is full, mark the peer as bad (so we don't ask
753             # them again), but don't set self.surprised. The loop() will find
754             # a new server.
755             #
756             # If the testv failed, log it, set self.surprised, but don't
757             # bother adding to self.bad_peers .
758
759             self.log("our testv failed, so the write did not happen",
760                      parent=lp, level=log.WEIRD, umid="8sc26g")
761             self.surprised = True
762             self.bad_peers.add(peerid) # don't ask them again
763             # use the checkstring to add information to the log message
764             for (shnum,readv) in read_data.items():
765                 checkstring = readv[0]
766                 (other_seqnum,
767                  other_roothash,
768                  other_salt) = unpack_checkstring(checkstring)
769                 expected_version = self._servermap.version_on_peer(peerid,
770                                                                    shnum)
771                 if expected_version:
772                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
773                      offsets_tuple) = expected_version
774                     self.log("somebody modified the share on us:"
775                              " shnum=%d: I thought they had #%d:R=%s,"
776                              " but testv reported #%d:R=%s" %
777                              (shnum,
778                               seqnum, base32.b2a(root_hash)[:4],
779                               other_seqnum, base32.b2a(other_roothash)[:4]),
780                              parent=lp, level=log.NOISY)
781                 # if expected_version==None, then we didn't expect to see a
782                 # share on that peer, and the 'surprise_shares' clause above
783                 # will have logged it.
784             # self.loop() will take care of finding new homes
785             return
786
787         for shnum in shnums:
788             self.placed.add( (peerid, shnum) )
789             # and update the servermap
790             self._servermap.add_new_share(peerid, shnum,
791                                           self.versioninfo, started)
792
793         # self.loop() will take care of checking to see if we're done
794         return
795
796     def _got_write_error(self, f, peerid, shnums, started):
797         for shnum in shnums:
798             self.outstanding.discard( (peerid, shnum) )
799         self.bad_peers.add(peerid)
800         self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
801                  shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
802                  failure=f,
803                  level=log.UNUSUAL)
804         # self.loop() will take care of checking to see if we're done
805         return
806
807
808     def _done(self, res):
809         if not self._running:
810             return
811         self._running = False
812         now = time.time()
813         self._status.timings["total"] = now - self._started
814         self._status.set_active(False)
815         if isinstance(res, failure.Failure):
816             self.log("Publish done, with failure", failure=res,
817                      level=log.WEIRD, umid="nRsR9Q")
818             self._status.set_status("Failed")
819         elif self.surprised:
820             self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
821             self._status.set_status("UncoordinatedWriteError")
822             # deliver a failure
823             res = failure.Failure(UncoordinatedWriteError())
824             # TODO: recovery
825         else:
826             self.log("Publish done, success")
827             self._status.set_status("Done")
828             self._status.set_progress(1.0)
829         eventually(self.done_deferred.callback, res)
830
831