]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
mutable/publish: stop using RuntimeError, for #639
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, struct, time
4 from itertools import count
5 from zope.interface import implements
6 from twisted.internet import defer
7 from twisted.python import failure
8 from allmydata.interfaces import IPublishStatus, FileTooLargeError
9 from allmydata.util import base32, hashutil, mathutil, idlib, log
10 from allmydata import hashtree, codec
11 from allmydata.storage.server import si_b2a
12 from pycryptopp.cipher.aes import AES
13 from foolscap.eventual import eventually
14
15 from common import MODE_WRITE, MODE_CHECK, DictOfSets, \
16      UncoordinatedWriteError, NotEnoughServersError
17 from servermap import ServerMap
18 from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
19      unpack_checkstring, SIGNED_PREFIX
20
21 class PublishStatus:
22     implements(IPublishStatus)
23     statusid_counter = count(0)
24     def __init__(self):
25         self.timings = {}
26         self.timings["send_per_server"] = {}
27         self.servermap = None
28         self.problems = {}
29         self.active = True
30         self.storage_index = None
31         self.helper = False
32         self.encoding = ("?", "?")
33         self.size = None
34         self.status = "Not started"
35         self.progress = 0.0
36         self.counter = self.statusid_counter.next()
37         self.started = time.time()
38
39     def add_per_server_time(self, peerid, elapsed):
40         if peerid not in self.timings["send_per_server"]:
41             self.timings["send_per_server"][peerid] = []
42         self.timings["send_per_server"][peerid].append(elapsed)
43
44     def get_started(self):
45         return self.started
46     def get_storage_index(self):
47         return self.storage_index
48     def get_encoding(self):
49         return self.encoding
50     def using_helper(self):
51         return self.helper
52     def get_servermap(self):
53         return self.servermap
54     def get_size(self):
55         return self.size
56     def get_status(self):
57         return self.status
58     def get_progress(self):
59         return self.progress
60     def get_active(self):
61         return self.active
62     def get_counter(self):
63         return self.counter
64
65     def set_storage_index(self, si):
66         self.storage_index = si
67     def set_helper(self, helper):
68         self.helper = helper
69     def set_servermap(self, servermap):
70         self.servermap = servermap
71     def set_encoding(self, k, n):
72         self.encoding = (k, n)
73     def set_size(self, size):
74         self.size = size
75     def set_status(self, status):
76         self.status = status
77     def set_progress(self, value):
78         self.progress = value
79     def set_active(self, value):
80         self.active = value
81
82 class LoopLimitExceededError(Exception):
83     pass
84
85 class Publish:
86     """I represent a single act of publishing the mutable file to the grid. I
87     will only publish my data if the servermap I am using still represents
88     the current state of the world.
89
90     To make the initial publish, set servermap to None.
91     """
92
93     # we limit the segment size as usual to constrain our memory footprint.
94     # The max segsize is higher for mutable files, because we want to support
95     # dirnodes with up to 10k children, and each child uses about 330 bytes.
96     # If you actually put that much into a directory you'll be using a
97     # footprint of around 14MB, which is higher than we'd like, but it is
98     # more important right now to support large directories than to make
99     # memory usage small when you use them. Once we implement MDMF (with
100     # multiple segments), we will drop this back down, probably to 128KiB.
101     MAX_SEGMENT_SIZE = 3500000
102
103     def __init__(self, filenode, servermap):
104         self._node = filenode
105         self._servermap = servermap
106         self._storage_index = self._node.get_storage_index()
107         self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
108         num = self._node._client.log("Publish(%s): starting" % prefix)
109         self._log_number = num
110         self._running = True
111
112         self._status = PublishStatus()
113         self._status.set_storage_index(self._storage_index)
114         self._status.set_helper(False)
115         self._status.set_progress(0.0)
116         self._status.set_active(True)
117
118     def get_status(self):
119         return self._status
120
121     def log(self, *args, **kwargs):
122         if 'parent' not in kwargs:
123             kwargs['parent'] = self._log_number
124         if "facility" not in kwargs:
125             kwargs["facility"] = "tahoe.mutable.publish"
126         return log.msg(*args, **kwargs)
127
128     def publish(self, newdata):
129         """Publish the filenode's current contents.  Returns a Deferred that
130         fires (with None) when the publish has done as much work as it's ever
131         going to do, or errbacks with ConsistencyError if it detects a
132         simultaneous write.
133         """
134
135         # 1: generate shares (SDMF: files are small, so we can do it in RAM)
136         # 2: perform peer selection, get candidate servers
137         #  2a: send queries to n+epsilon servers, to determine current shares
138         #  2b: based upon responses, create target map
139         # 3: send slot_testv_and_readv_and_writev messages
140         # 4: as responses return, update share-dispatch table
141         # 4a: may need to run recovery algorithm
142         # 5: when enough responses are back, we're done
143
144         self.log("starting publish, datalen is %s" % len(newdata))
145         if len(newdata) > self.MAX_SEGMENT_SIZE:
146             raise FileTooLargeError("SDMF is limited to one segment, and "
147                                     "%d > %d" % (len(newdata),
148                                                  self.MAX_SEGMENT_SIZE))
149         self._status.set_size(len(newdata))
150         self._status.set_status("Started")
151         self._started = time.time()
152
153         self.done_deferred = defer.Deferred()
154
155         self._writekey = self._node.get_writekey()
156         assert self._writekey, "need write capability to publish"
157
158         # first, which servers will we publish to? We require that the
159         # servermap was updated in MODE_WRITE, so we can depend upon the
160         # peerlist computed by that process instead of computing our own.
161         if self._servermap:
162             assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
163             # we will push a version that is one larger than anything present
164             # in the grid, according to the servermap.
165             self._new_seqnum = self._servermap.highest_seqnum() + 1
166         else:
167             # If we don't have a servermap, that's because we're doing the
168             # initial publish
169             self._new_seqnum = 1
170             self._servermap = ServerMap()
171         self._status.set_servermap(self._servermap)
172
173         self.log(format="new seqnum will be %(seqnum)d",
174                  seqnum=self._new_seqnum, level=log.NOISY)
175
176         # having an up-to-date servermap (or using a filenode that was just
177         # created for the first time) also guarantees that the following
178         # fields are available
179         self.readkey = self._node.get_readkey()
180         self.required_shares = self._node.get_required_shares()
181         assert self.required_shares is not None
182         self.total_shares = self._node.get_total_shares()
183         assert self.total_shares is not None
184         self._status.set_encoding(self.required_shares, self.total_shares)
185
186         self._pubkey = self._node.get_pubkey()
187         assert self._pubkey
188         self._privkey = self._node.get_privkey()
189         assert self._privkey
190         self._encprivkey = self._node.get_encprivkey()
191
192         client = self._node._client
193         full_peerlist = client.get_permuted_peers("storage",
194                                                   self._storage_index)
195         self.full_peerlist = full_peerlist # for use later, immutable
196         self.bad_peers = set() # peerids who have errbacked/refused requests
197
198         self.newdata = newdata
199         self.salt = os.urandom(16)
200
201         self.setup_encoding_parameters()
202
203         # if we experience any surprises (writes which were rejected because
204         # our test vector did not match, or shares which we didn't expect to
205         # see), we set this flag and report an UncoordinatedWriteError at the
206         # end of the publish process.
207         self.surprised = False
208
209         # as a failsafe, refuse to iterate through self.loop more than a
210         # thousand times.
211         self.looplimit = 1000
212
213         # we keep track of three tables. The first is our goal: which share
214         # we want to see on which servers. This is initially populated by the
215         # existing servermap.
216         self.goal = set() # pairs of (peerid, shnum) tuples
217
218         # the second table is our list of outstanding queries: those which
219         # are in flight and may or may not be delivered, accepted, or
220         # acknowledged. Items are added to this table when the request is
221         # sent, and removed when the response returns (or errbacks).
222         self.outstanding = set() # (peerid, shnum) tuples
223
224         # the third is a table of successes: share which have actually been
225         # placed. These are populated when responses come back with success.
226         # When self.placed == self.goal, we're done.
227         self.placed = set() # (peerid, shnum) tuples
228
229         # we also keep a mapping from peerid to RemoteReference. Each time we
230         # pull a connection out of the full peerlist, we add it to this for
231         # use later.
232         self.connections = {}
233
234         self.bad_share_checkstrings = {}
235
236         # we use the servermap to populate the initial goal: this way we will
237         # try to update each existing share in place.
238         for (peerid, shnum) in self._servermap.servermap:
239             self.goal.add( (peerid, shnum) )
240             self.connections[peerid] = self._servermap.connections[peerid]
241         # then we add in all the shares that were bad (corrupted, bad
242         # signatures, etc). We want to replace these.
243         for key, old_checkstring in self._servermap.bad_shares.items():
244             (peerid, shnum) = key
245             self.goal.add(key)
246             self.bad_share_checkstrings[key] = old_checkstring
247             self.connections[peerid] = self._servermap.connections[peerid]
248
249         # create the shares. We'll discard these as they are delivered. SDMF:
250         # we're allowed to hold everything in memory.
251
252         self._status.timings["setup"] = time.time() - self._started
253         d = self._encrypt_and_encode()
254         d.addCallback(self._generate_shares)
255         def _start_pushing(res):
256             self._started_pushing = time.time()
257             return res
258         d.addCallback(_start_pushing)
259         d.addCallback(self.loop) # trigger delivery
260         d.addErrback(self._fatal_error)
261
262         return self.done_deferred
263
264     def setup_encoding_parameters(self):
265         segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
266         # this must be a multiple of self.required_shares
267         segment_size = mathutil.next_multiple(segment_size,
268                                               self.required_shares)
269         self.segment_size = segment_size
270         if segment_size:
271             self.num_segments = mathutil.div_ceil(len(self.newdata),
272                                                   segment_size)
273         else:
274             self.num_segments = 0
275         assert self.num_segments in [0, 1,] # SDMF restrictions
276
277     def _fatal_error(self, f):
278         self.log("error during loop", failure=f, level=log.UNUSUAL)
279         self._done(f)
280
281     def _update_status(self):
282         self._status.set_status("Sending Shares: %d placed out of %d, "
283                                 "%d messages outstanding" %
284                                 (len(self.placed),
285                                  len(self.goal),
286                                  len(self.outstanding)))
287         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
288
289     def loop(self, ignored=None):
290         self.log("entering loop", level=log.NOISY)
291         if not self._running:
292             return
293
294         self.looplimit -= 1
295         if self.looplimit <= 0:
296             raise LoopLimitExceededError("loop limit exceeded")
297
298         if self.surprised:
299             # don't send out any new shares, just wait for the outstanding
300             # ones to be retired.
301             self.log("currently surprised, so don't send any new shares",
302                      level=log.NOISY)
303         else:
304             self.update_goal()
305             # how far are we from our goal?
306             needed = self.goal - self.placed - self.outstanding
307             self._update_status()
308
309             if needed:
310                 # we need to send out new shares
311                 self.log(format="need to send %(needed)d new shares",
312                          needed=len(needed), level=log.NOISY)
313                 self._send_shares(needed)
314                 return
315
316         if self.outstanding:
317             # queries are still pending, keep waiting
318             self.log(format="%(outstanding)d queries still outstanding",
319                      outstanding=len(self.outstanding),
320                      level=log.NOISY)
321             return
322
323         # no queries outstanding, no placements needed: we're done
324         self.log("no queries outstanding, no placements needed: done",
325                  level=log.OPERATIONAL)
326         now = time.time()
327         elapsed = now - self._started_pushing
328         self._status.timings["push"] = elapsed
329         return self._done(None)
330
331     def log_goal(self, goal, message=""):
332         logmsg = [message]
333         for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
334             logmsg.append("sh%d to [%s]" % (shnum,
335                                             idlib.shortnodeid_b2a(peerid)))
336         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
337         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
338                  level=log.NOISY)
339
340     def update_goal(self):
341         # if log.recording_noisy
342         if True:
343             self.log_goal(self.goal, "before update: ")
344
345         # first, remove any bad peers from our goal
346         self.goal = set([ (peerid, shnum)
347                           for (peerid, shnum) in self.goal
348                           if peerid not in self.bad_peers ])
349
350         # find the homeless shares:
351         homefull_shares = set([shnum for (peerid, shnum) in self.goal])
352         homeless_shares = set(range(self.total_shares)) - homefull_shares
353         homeless_shares = sorted(list(homeless_shares))
354         # place them somewhere. We prefer unused servers at the beginning of
355         # the available peer list.
356
357         if not homeless_shares:
358             return
359
360         # if an old share X is on a node, put the new share X there too.
361         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
362         #       shares from existing peers to new (less-crowded) ones. The
363         #       old shares must still be updated.
364         # TODO: 2: move those shares instead of copying them, to reduce future
365         #       update work
366
367         # this is a bit CPU intensive but easy to analyze. We create a sort
368         # order for each peerid. If the peerid is marked as bad, we don't
369         # even put them in the list. Then we care about the number of shares
370         # which have already been assigned to them. After that we care about
371         # their permutation order.
372         old_assignments = DictOfSets()
373         for (peerid, shnum) in self.goal:
374             old_assignments.add(peerid, shnum)
375
376         peerlist = []
377         for i, (peerid, ss) in enumerate(self.full_peerlist):
378             if peerid in self.bad_peers:
379                 continue
380             entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
381             peerlist.append(entry)
382         peerlist.sort()
383
384         if not peerlist:
385             raise NotEnoughServersError("Ran out of non-bad servers")
386
387         new_assignments = []
388         # we then index this peerlist with an integer, because we may have to
389         # wrap. We update the goal as we go.
390         i = 0
391         for shnum in homeless_shares:
392             (ignored1, ignored2, peerid, ss) = peerlist[i]
393             # if we are forced to send a share to a server that already has
394             # one, we may have two write requests in flight, and the
395             # servermap (which was computed before either request was sent)
396             # won't reflect the new shares, so the second response will be
397             # surprising. There is code in _got_write_answer() to tolerate
398             # this, otherwise it would cause the publish to fail with an
399             # UncoordinatedWriteError. See #546 for details of the trouble
400             # this used to cause.
401             self.goal.add( (peerid, shnum) )
402             self.connections[peerid] = ss
403             i += 1
404             if i >= len(peerlist):
405                 i = 0
406         if True:
407             self.log_goal(self.goal, "after update: ")
408
409
410
411     def _encrypt_and_encode(self):
412         # this returns a Deferred that fires with a list of (sharedata,
413         # sharenum) tuples. TODO: cache the ciphertext, only produce the
414         # shares that we care about.
415         self.log("_encrypt_and_encode")
416
417         self._status.set_status("Encrypting")
418         started = time.time()
419
420         key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
421         enc = AES(key)
422         crypttext = enc.process(self.newdata)
423         assert len(crypttext) == len(self.newdata)
424
425         now = time.time()
426         self._status.timings["encrypt"] = now - started
427         started = now
428
429         # now apply FEC
430
431         self._status.set_status("Encoding")
432         fec = codec.CRSEncoder()
433         fec.set_params(self.segment_size,
434                        self.required_shares, self.total_shares)
435         piece_size = fec.get_block_size()
436         crypttext_pieces = [None] * self.required_shares
437         for i in range(len(crypttext_pieces)):
438             offset = i * piece_size
439             piece = crypttext[offset:offset+piece_size]
440             piece = piece + "\x00"*(piece_size - len(piece)) # padding
441             crypttext_pieces[i] = piece
442             assert len(piece) == piece_size
443
444         d = fec.encode(crypttext_pieces)
445         def _done_encoding(res):
446             elapsed = time.time() - started
447             self._status.timings["encode"] = elapsed
448             return res
449         d.addCallback(_done_encoding)
450         return d
451
452     def _generate_shares(self, shares_and_shareids):
453         # this sets self.shares and self.root_hash
454         self.log("_generate_shares")
455         self._status.set_status("Generating Shares")
456         started = time.time()
457
458         # we should know these by now
459         privkey = self._privkey
460         encprivkey = self._encprivkey
461         pubkey = self._pubkey
462
463         (shares, share_ids) = shares_and_shareids
464
465         assert len(shares) == len(share_ids)
466         assert len(shares) == self.total_shares
467         all_shares = {}
468         block_hash_trees = {}
469         share_hash_leaves = [None] * len(shares)
470         for i in range(len(shares)):
471             share_data = shares[i]
472             shnum = share_ids[i]
473             all_shares[shnum] = share_data
474
475             # build the block hash tree. SDMF has only one leaf.
476             leaves = [hashutil.block_hash(share_data)]
477             t = hashtree.HashTree(leaves)
478             block_hash_trees[shnum] = block_hash_tree = list(t)
479             share_hash_leaves[shnum] = t[0]
480         for leaf in share_hash_leaves:
481             assert leaf is not None
482         share_hash_tree = hashtree.HashTree(share_hash_leaves)
483         share_hash_chain = {}
484         for shnum in range(self.total_shares):
485             needed_hashes = share_hash_tree.needed_hashes(shnum)
486             share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
487                                               for i in needed_hashes ] )
488         root_hash = share_hash_tree[0]
489         assert len(root_hash) == 32
490         self.log("my new root_hash is %s" % base32.b2a(root_hash))
491         self._new_version_info = (self._new_seqnum, root_hash, self.salt)
492
493         prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
494                              self.required_shares, self.total_shares,
495                              self.segment_size, len(self.newdata))
496
497         # now pack the beginning of the share. All shares are the same up
498         # to the signature, then they have divergent share hash chains,
499         # then completely different block hash trees + salt + share data,
500         # then they all share the same encprivkey at the end. The sizes
501         # of everything are the same for all shares.
502
503         sign_started = time.time()
504         signature = privkey.sign(prefix)
505         self._status.timings["sign"] = time.time() - sign_started
506
507         verification_key = pubkey.serialize()
508
509         final_shares = {}
510         for shnum in range(self.total_shares):
511             final_share = pack_share(prefix,
512                                      verification_key,
513                                      signature,
514                                      share_hash_chain[shnum],
515                                      block_hash_trees[shnum],
516                                      all_shares[shnum],
517                                      encprivkey)
518             final_shares[shnum] = final_share
519         elapsed = time.time() - started
520         self._status.timings["pack"] = elapsed
521         self.shares = final_shares
522         self.root_hash = root_hash
523
524         # we also need to build up the version identifier for what we're
525         # pushing. Extract the offsets from one of our shares.
526         assert final_shares
527         offsets = unpack_header(final_shares.values()[0])[-1]
528         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
529         verinfo = (self._new_seqnum, root_hash, self.salt,
530                    self.segment_size, len(self.newdata),
531                    self.required_shares, self.total_shares,
532                    prefix, offsets_tuple)
533         self.versioninfo = verinfo
534
535
536
537     def _send_shares(self, needed):
538         self.log("_send_shares")
539
540         # we're finally ready to send out our shares. If we encounter any
541         # surprises here, it's because somebody else is writing at the same
542         # time. (Note: in the future, when we remove the _query_peers() step
543         # and instead speculate about [or remember] which shares are where,
544         # surprises here are *not* indications of UncoordinatedWriteError,
545         # and we'll need to respond to them more gracefully.)
546
547         # needed is a set of (peerid, shnum) tuples. The first thing we do is
548         # organize it by peerid.
549
550         peermap = DictOfSets()
551         for (peerid, shnum) in needed:
552             peermap.add(peerid, shnum)
553
554         # the next thing is to build up a bunch of test vectors. The
555         # semantics of Publish are that we perform the operation if the world
556         # hasn't changed since the ServerMap was constructed (more or less).
557         # For every share we're trying to place, we create a test vector that
558         # tests to see if the server*share still corresponds to the
559         # map.
560
561         all_tw_vectors = {} # maps peerid to tw_vectors
562         sm = self._servermap.servermap
563
564         for key in needed:
565             (peerid, shnum) = key
566
567             if key in sm:
568                 # an old version of that share already exists on the
569                 # server, according to our servermap. We will create a
570                 # request that attempts to replace it.
571                 old_versionid, old_timestamp = sm[key]
572                 (old_seqnum, old_root_hash, old_salt, old_segsize,
573                  old_datalength, old_k, old_N, old_prefix,
574                  old_offsets_tuple) = old_versionid
575                 old_checkstring = pack_checkstring(old_seqnum,
576                                                    old_root_hash,
577                                                    old_salt)
578                 testv = (0, len(old_checkstring), "eq", old_checkstring)
579
580             elif key in self.bad_share_checkstrings:
581                 old_checkstring = self.bad_share_checkstrings[key]
582                 testv = (0, len(old_checkstring), "eq", old_checkstring)
583
584             else:
585                 # add a testv that requires the share not exist
586
587                 # Unfortunately, foolscap-0.2.5 has a bug in the way inbound
588                 # constraints are handled. If the same object is referenced
589                 # multiple times inside the arguments, foolscap emits a
590                 # 'reference' token instead of a distinct copy of the
591                 # argument. The bug is that these 'reference' tokens are not
592                 # accepted by the inbound constraint code. To work around
593                 # this, we need to prevent python from interning the
594                 # (constant) tuple, by creating a new copy of this vector
595                 # each time.
596
597                 # This bug is fixed in foolscap-0.2.6, and even though this
598                 # version of Tahoe requires foolscap-0.3.1 or newer, we are
599                 # supposed to be able to interoperate with older versions of
600                 # Tahoe which are allowed to use older versions of foolscap,
601                 # including foolscap-0.2.5 . In addition, I've seen other
602                 # foolscap problems triggered by 'reference' tokens (see #541
603                 # for details). So we must keep this workaround in place.
604
605                 #testv = (0, 1, 'eq', "")
606                 testv = tuple([0, 1, 'eq', ""])
607
608             testvs = [testv]
609             # the write vector is simply the share
610             writev = [(0, self.shares[shnum])]
611
612             if peerid not in all_tw_vectors:
613                 all_tw_vectors[peerid] = {}
614                 # maps shnum to (testvs, writevs, new_length)
615             assert shnum not in all_tw_vectors[peerid]
616
617             all_tw_vectors[peerid][shnum] = (testvs, writev, None)
618
619         # we read the checkstring back from each share, however we only use
620         # it to detect whether there was a new share that we didn't know
621         # about. The success or failure of the write will tell us whether
622         # there was a collision or not. If there is a collision, the first
623         # thing we'll do is update the servermap, which will find out what
624         # happened. We could conceivably reduce a roundtrip by using the
625         # readv checkstring to populate the servermap, but really we'd have
626         # to read enough data to validate the signatures too, so it wouldn't
627         # be an overall win.
628         read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
629
630         # ok, send the messages!
631         self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
632         started = time.time()
633         for (peerid, tw_vectors) in all_tw_vectors.items():
634
635             write_enabler = self._node.get_write_enabler(peerid)
636             renew_secret = self._node.get_renewal_secret(peerid)
637             cancel_secret = self._node.get_cancel_secret(peerid)
638             secrets = (write_enabler, renew_secret, cancel_secret)
639             shnums = tw_vectors.keys()
640
641             for shnum in shnums:
642                 self.outstanding.add( (peerid, shnum) )
643
644             d = self._do_testreadwrite(peerid, secrets,
645                                        tw_vectors, read_vector)
646             d.addCallbacks(self._got_write_answer, self._got_write_error,
647                            callbackArgs=(peerid, shnums, started),
648                            errbackArgs=(peerid, shnums, started))
649             d.addCallback(self.loop)
650             d.addErrback(self._fatal_error)
651
652         self._update_status()
653         self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
654
655     def _do_testreadwrite(self, peerid, secrets,
656                           tw_vectors, read_vector):
657         storage_index = self._storage_index
658         ss = self.connections[peerid]
659
660         #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
661         d = ss.callRemote("slot_testv_and_readv_and_writev",
662                           storage_index,
663                           secrets,
664                           tw_vectors,
665                           read_vector)
666         return d
667
668     def _got_write_answer(self, answer, peerid, shnums, started):
669         lp = self.log("_got_write_answer from %s" %
670                       idlib.shortnodeid_b2a(peerid))
671         for shnum in shnums:
672             self.outstanding.discard( (peerid, shnum) )
673
674         now = time.time()
675         elapsed = now - started
676         self._status.add_per_server_time(peerid, elapsed)
677
678         wrote, read_data = answer
679
680         surprise_shares = set(read_data.keys()) - set(shnums)
681
682         surprised = False
683         for shnum in surprise_shares:
684             # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
685             checkstring = read_data[shnum][0]
686             their_version_info = unpack_checkstring(checkstring)
687             if their_version_info == self._new_version_info:
688                 # they have the right share, somehow
689
690                 if (peerid,shnum) in self.goal:
691                     # and we want them to have it, so we probably sent them a
692                     # copy in an earlier write. This is ok, and avoids the
693                     # #546 problem.
694                     continue
695
696                 # They aren't in our goal, but they are still for the right
697                 # version. Somebody else wrote them, and it's a convergent
698                 # uncoordinated write. Pretend this is ok (don't be
699                 # surprised), since I suspect there's a decent chance that
700                 # we'll hit this in normal operation.
701                 continue
702
703             else:
704                 # the new shares are of a different version
705                 if peerid in self._servermap.reachable_peers:
706                     # we asked them about their shares, so we had knowledge
707                     # of what they used to have. Any surprising shares must
708                     # have come from someone else, so UCW.
709                     surprised = True
710                 else:
711                     # we didn't ask them, and now we've discovered that they
712                     # have a share we didn't know about. This indicates that
713                     # mapupdate should have wokred harder and asked more
714                     # servers before concluding that it knew about them all.
715
716                     # signal UCW, but make sure to ask this peer next time,
717                     # so we'll remember to update it if/when we retry.
718                     surprised = True
719                     # TODO: ask this peer next time. I don't yet have a good
720                     # way to do this. Two insufficient possibilities are:
721                     #
722                     # self._servermap.add_new_share(peerid, shnum, verinfo, now)
723                     #  but that requires fetching/validating/parsing the whole
724                     #  version string, and all we have is the checkstring
725                     # self._servermap.mark_bad_share(peerid, shnum, checkstring)
726                     #  that will make publish overwrite the share next time,
727                     #  but it won't re-query the server, and it won't make
728                     #  mapupdate search further
729
730                     # TODO later: when publish starts, do
731                     # servermap.get_best_version(), extract the seqnum,
732                     # subtract one, and store as highest-replaceable-seqnum.
733                     # Then, if this surprise-because-we-didn't-ask share is
734                     # of highest-replaceable-seqnum or lower, we're allowed
735                     # to replace it: send out a new writev (or rather add it
736                     # to self.goal and loop).
737                     pass
738
739                 surprised = True
740
741         if surprised:
742             self.log("they had shares %s that we didn't know about" %
743                      (list(surprise_shares),),
744                      parent=lp, level=log.WEIRD, umid="un9CSQ")
745             self.surprised = True
746
747         if not wrote:
748             # TODO: there are two possibilities. The first is that the server
749             # is full (or just doesn't want to give us any room), which means
750             # we shouldn't ask them again, but is *not* an indication of an
751             # uncoordinated write. The second is that our testv failed, which
752             # *does* indicate an uncoordinated write. We currently don't have
753             # a way to tell these two apart (in fact, the storage server code
754             # doesn't have the option of refusing our share).
755             #
756             # If the server is full, mark the peer as bad (so we don't ask
757             # them again), but don't set self.surprised. The loop() will find
758             # a new server.
759             #
760             # If the testv failed, log it, set self.surprised, but don't
761             # bother adding to self.bad_peers .
762
763             self.log("our testv failed, so the write did not happen",
764                      parent=lp, level=log.WEIRD, umid="8sc26g")
765             self.surprised = True
766             self.bad_peers.add(peerid) # don't ask them again
767             # use the checkstring to add information to the log message
768             for (shnum,readv) in read_data.items():
769                 checkstring = readv[0]
770                 (other_seqnum,
771                  other_roothash,
772                  other_salt) = unpack_checkstring(checkstring)
773                 expected_version = self._servermap.version_on_peer(peerid,
774                                                                    shnum)
775                 if expected_version:
776                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
777                      offsets_tuple) = expected_version
778                     self.log("somebody modified the share on us:"
779                              " shnum=%d: I thought they had #%d:R=%s,"
780                              " but testv reported #%d:R=%s" %
781                              (shnum,
782                               seqnum, base32.b2a(root_hash)[:4],
783                               other_seqnum, base32.b2a(other_roothash)[:4]),
784                              parent=lp, level=log.NOISY)
785                 # if expected_version==None, then we didn't expect to see a
786                 # share on that peer, and the 'surprise_shares' clause above
787                 # will have logged it.
788             # self.loop() will take care of finding new homes
789             return
790
791         for shnum in shnums:
792             self.placed.add( (peerid, shnum) )
793             # and update the servermap
794             self._servermap.add_new_share(peerid, shnum,
795                                           self.versioninfo, started)
796
797         # self.loop() will take care of checking to see if we're done
798         return
799
800     def _got_write_error(self, f, peerid, shnums, started):
801         for shnum in shnums:
802             self.outstanding.discard( (peerid, shnum) )
803         self.bad_peers.add(peerid)
804         self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
805                  shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
806                  failure=f,
807                  level=log.UNUSUAL)
808         # self.loop() will take care of checking to see if we're done
809         return
810
811
812     def _done(self, res):
813         if not self._running:
814             return
815         self._running = False
816         now = time.time()
817         self._status.timings["total"] = now - self._started
818         self._status.set_active(False)
819         if isinstance(res, failure.Failure):
820             self.log("Publish done, with failure", failure=res,
821                      level=log.WEIRD, umid="nRsR9Q")
822             self._status.set_status("Failed")
823         elif self.surprised:
824             self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
825             self._status.set_status("UncoordinatedWriteError")
826             # deliver a failure
827             res = failure.Failure(UncoordinatedWriteError())
828             # TODO: recovery
829         else:
830             self.log("Publish done, success")
831             self._status.set_status("Done")
832             self._status.set_progress(1.0)
833         eventually(self.done_deferred.callback, res)
834
835