]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
logging: add 'unique-message-ids' (or 'umids') to each WEIRD-or-higher log.msg call...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, struct, time
4 from itertools import count
5 from zope.interface import implements
6 from twisted.internet import defer
7 from twisted.python import failure
8 from allmydata.interfaces import IPublishStatus, FileTooLargeError
9 from allmydata.util import base32, hashutil, mathutil, idlib, log
10 from allmydata import hashtree, codec, storage
11 from pycryptopp.cipher.aes import AES
12 from foolscap.eventual import eventually
13
14 from common import MODE_WRITE, MODE_CHECK, DictOfSets, \
15      UncoordinatedWriteError, NotEnoughServersError
16 from servermap import ServerMap
17 from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
18      unpack_checkstring, SIGNED_PREFIX
19
20 class PublishStatus:
21     implements(IPublishStatus)
22     statusid_counter = count(0)
23     def __init__(self):
24         self.timings = {}
25         self.timings["send_per_server"] = {}
26         self.servermap = None
27         self.problems = {}
28         self.active = True
29         self.storage_index = None
30         self.helper = False
31         self.encoding = ("?", "?")
32         self.size = None
33         self.status = "Not started"
34         self.progress = 0.0
35         self.counter = self.statusid_counter.next()
36         self.started = time.time()
37
38     def add_per_server_time(self, peerid, elapsed):
39         if peerid not in self.timings["send_per_server"]:
40             self.timings["send_per_server"][peerid] = []
41         self.timings["send_per_server"][peerid].append(elapsed)
42
43     def get_started(self):
44         return self.started
45     def get_storage_index(self):
46         return self.storage_index
47     def get_encoding(self):
48         return self.encoding
49     def using_helper(self):
50         return self.helper
51     def get_servermap(self):
52         return self.servermap
53     def get_size(self):
54         return self.size
55     def get_status(self):
56         return self.status
57     def get_progress(self):
58         return self.progress
59     def get_active(self):
60         return self.active
61     def get_counter(self):
62         return self.counter
63
64     def set_storage_index(self, si):
65         self.storage_index = si
66     def set_helper(self, helper):
67         self.helper = helper
68     def set_servermap(self, servermap):
69         self.servermap = servermap
70     def set_encoding(self, k, n):
71         self.encoding = (k, n)
72     def set_size(self, size):
73         self.size = size
74     def set_status(self, status):
75         self.status = status
76     def set_progress(self, value):
77         self.progress = value
78     def set_active(self, value):
79         self.active = value
80
81 class Publish:
82     """I represent a single act of publishing the mutable file to the grid. I
83     will only publish my data if the servermap I am using still represents
84     the current state of the world.
85
86     To make the initial publish, set servermap to None.
87     """
88
89     # we limit the segment size as usual to constrain our memory footprint.
90     # The max segsize is higher for mutable files, because we want to support
91     # dirnodes with up to 10k children, and each child uses about 330 bytes.
92     # If you actually put that much into a directory you'll be using a
93     # footprint of around 14MB, which is higher than we'd like, but it is
94     # more important right now to support large directories than to make
95     # memory usage small when you use them. Once we implement MDMF (with
96     # multiple segments), we will drop this back down, probably to 128KiB.
97     MAX_SEGMENT_SIZE = 3500000
98
99     def __init__(self, filenode, servermap):
100         self._node = filenode
101         self._servermap = servermap
102         self._storage_index = self._node.get_storage_index()
103         self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
104         num = self._node._client.log("Publish(%s): starting" % prefix)
105         self._log_number = num
106         self._running = True
107
108         self._status = PublishStatus()
109         self._status.set_storage_index(self._storage_index)
110         self._status.set_helper(False)
111         self._status.set_progress(0.0)
112         self._status.set_active(True)
113
114     def get_status(self):
115         return self._status
116
117     def log(self, *args, **kwargs):
118         if 'parent' not in kwargs:
119             kwargs['parent'] = self._log_number
120         if "facility" not in kwargs:
121             kwargs["facility"] = "tahoe.mutable.publish"
122         return log.msg(*args, **kwargs)
123
124     def publish(self, newdata):
125         """Publish the filenode's current contents.  Returns a Deferred that
126         fires (with None) when the publish has done as much work as it's ever
127         going to do, or errbacks with ConsistencyError if it detects a
128         simultaneous write.
129         """
130
131         # 1: generate shares (SDMF: files are small, so we can do it in RAM)
132         # 2: perform peer selection, get candidate servers
133         #  2a: send queries to n+epsilon servers, to determine current shares
134         #  2b: based upon responses, create target map
135         # 3: send slot_testv_and_readv_and_writev messages
136         # 4: as responses return, update share-dispatch table
137         # 4a: may need to run recovery algorithm
138         # 5: when enough responses are back, we're done
139
140         self.log("starting publish, datalen is %s" % len(newdata))
141         if len(newdata) > self.MAX_SEGMENT_SIZE:
142             raise FileTooLargeError("SDMF is limited to one segment, and "
143                                     "%d > %d" % (len(newdata),
144                                                  self.MAX_SEGMENT_SIZE))
145         self._status.set_size(len(newdata))
146         self._status.set_status("Started")
147         self._started = time.time()
148
149         self.done_deferred = defer.Deferred()
150
151         self._writekey = self._node.get_writekey()
152         assert self._writekey, "need write capability to publish"
153
154         # first, which servers will we publish to? We require that the
155         # servermap was updated in MODE_WRITE, so we can depend upon the
156         # peerlist computed by that process instead of computing our own.
157         if self._servermap:
158             assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
159             # we will push a version that is one larger than anything present
160             # in the grid, according to the servermap.
161             self._new_seqnum = self._servermap.highest_seqnum() + 1
162         else:
163             # If we don't have a servermap, that's because we're doing the
164             # initial publish
165             self._new_seqnum = 1
166             self._servermap = ServerMap()
167         self._status.set_servermap(self._servermap)
168
169         self.log(format="new seqnum will be %(seqnum)d",
170                  seqnum=self._new_seqnum, level=log.NOISY)
171
172         # having an up-to-date servermap (or using a filenode that was just
173         # created for the first time) also guarantees that the following
174         # fields are available
175         self.readkey = self._node.get_readkey()
176         self.required_shares = self._node.get_required_shares()
177         assert self.required_shares is not None
178         self.total_shares = self._node.get_total_shares()
179         assert self.total_shares is not None
180         self._status.set_encoding(self.required_shares, self.total_shares)
181
182         self._pubkey = self._node.get_pubkey()
183         assert self._pubkey
184         self._privkey = self._node.get_privkey()
185         assert self._privkey
186         self._encprivkey = self._node.get_encprivkey()
187
188         client = self._node._client
189         full_peerlist = client.get_permuted_peers("storage",
190                                                   self._storage_index)
191         self.full_peerlist = full_peerlist # for use later, immutable
192         self.bad_peers = set() # peerids who have errbacked/refused requests
193
194         self.newdata = newdata
195         self.salt = os.urandom(16)
196
197         self.setup_encoding_parameters()
198
199         # if we experience any surprises (writes which were rejected because
200         # our test vector did not match, or shares which we didn't expect to
201         # see), we set this flag and report an UncoordinatedWriteError at the
202         # end of the publish process.
203         self.surprised = False
204
205         # as a failsafe, refuse to iterate through self.loop more than a
206         # thousand times.
207         self.looplimit = 1000
208
209         # we keep track of three tables. The first is our goal: which share
210         # we want to see on which servers. This is initially populated by the
211         # existing servermap.
212         self.goal = set() # pairs of (peerid, shnum) tuples
213
214         # the second table is our list of outstanding queries: those which
215         # are in flight and may or may not be delivered, accepted, or
216         # acknowledged. Items are added to this table when the request is
217         # sent, and removed when the response returns (or errbacks).
218         self.outstanding = set() # (peerid, shnum) tuples
219
220         # the third is a table of successes: share which have actually been
221         # placed. These are populated when responses come back with success.
222         # When self.placed == self.goal, we're done.
223         self.placed = set() # (peerid, shnum) tuples
224
225         # we also keep a mapping from peerid to RemoteReference. Each time we
226         # pull a connection out of the full peerlist, we add it to this for
227         # use later.
228         self.connections = {}
229
230         self.bad_share_checkstrings = {}
231
232         # we use the servermap to populate the initial goal: this way we will
233         # try to update each existing share in place.
234         for (peerid, shnum) in self._servermap.servermap:
235             self.goal.add( (peerid, shnum) )
236             self.connections[peerid] = self._servermap.connections[peerid]
237         # then we add in all the shares that were bad (corrupted, bad
238         # signatures, etc). We want to replace these.
239         for (peerid, shnum, old_checkstring) in self._servermap.bad_shares:
240             self.goal.add( (peerid, shnum) )
241             self.bad_share_checkstrings[ (peerid, shnum) ] = old_checkstring
242             self.connections[peerid] = self._servermap.connections[peerid]
243
244         # create the shares. We'll discard these as they are delivered. SDMF:
245         # we're allowed to hold everything in memory.
246
247         self._status.timings["setup"] = time.time() - self._started
248         d = self._encrypt_and_encode()
249         d.addCallback(self._generate_shares)
250         def _start_pushing(res):
251             self._started_pushing = time.time()
252             return res
253         d.addCallback(_start_pushing)
254         d.addCallback(self.loop) # trigger delivery
255         d.addErrback(self._fatal_error)
256
257         return self.done_deferred
258
259     def setup_encoding_parameters(self):
260         segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
261         # this must be a multiple of self.required_shares
262         segment_size = mathutil.next_multiple(segment_size,
263                                               self.required_shares)
264         self.segment_size = segment_size
265         if segment_size:
266             self.num_segments = mathutil.div_ceil(len(self.newdata),
267                                                   segment_size)
268         else:
269             self.num_segments = 0
270         assert self.num_segments in [0, 1,] # SDMF restrictions
271
272     def _fatal_error(self, f):
273         self.log("error during loop", failure=f, level=log.UNUSUAL)
274         self._done(f)
275
276     def _update_status(self):
277         self._status.set_status("Sending Shares: %d placed out of %d, "
278                                 "%d messages outstanding" %
279                                 (len(self.placed),
280                                  len(self.goal),
281                                  len(self.outstanding)))
282         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
283
284     def loop(self, ignored=None):
285         self.log("entering loop", level=log.NOISY)
286         if not self._running:
287             return
288
289         self.looplimit -= 1
290         if self.looplimit <= 0:
291             raise RuntimeError("loop limit exceeded")
292
293         if self.surprised:
294             # don't send out any new shares, just wait for the outstanding
295             # ones to be retired.
296             self.log("currently surprised, so don't send any new shares",
297                      level=log.NOISY)
298         else:
299             self.update_goal()
300             # how far are we from our goal?
301             needed = self.goal - self.placed - self.outstanding
302             self._update_status()
303
304             if needed:
305                 # we need to send out new shares
306                 self.log(format="need to send %(needed)d new shares",
307                          needed=len(needed), level=log.NOISY)
308                 self._send_shares(needed)
309                 return
310
311         if self.outstanding:
312             # queries are still pending, keep waiting
313             self.log(format="%(outstanding)d queries still outstanding",
314                      outstanding=len(self.outstanding),
315                      level=log.NOISY)
316             return
317
318         # no queries outstanding, no placements needed: we're done
319         self.log("no queries outstanding, no placements needed: done",
320                  level=log.OPERATIONAL)
321         now = time.time()
322         elapsed = now - self._started_pushing
323         self._status.timings["push"] = elapsed
324         return self._done(None)
325
326     def log_goal(self, goal, message=""):
327         logmsg = [message]
328         for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
329             logmsg.append("sh%d to [%s]" % (shnum,
330                                             idlib.shortnodeid_b2a(peerid)))
331         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
332         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
333                  level=log.NOISY)
334
335     def update_goal(self):
336         # if log.recording_noisy
337         if True:
338             self.log_goal(self.goal, "before update: ")
339
340         # first, remove any bad peers from our goal
341         self.goal = set([ (peerid, shnum)
342                           for (peerid, shnum) in self.goal
343                           if peerid not in self.bad_peers ])
344
345         # find the homeless shares:
346         homefull_shares = set([shnum for (peerid, shnum) in self.goal])
347         homeless_shares = set(range(self.total_shares)) - homefull_shares
348         homeless_shares = sorted(list(homeless_shares))
349         # place them somewhere. We prefer unused servers at the beginning of
350         # the available peer list.
351
352         if not homeless_shares:
353             return
354
355         # if an old share X is on a node, put the new share X there too.
356         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
357         #       shares from existing peers to new (less-crowded) ones. The
358         #       old shares must still be updated.
359         # TODO: 2: move those shares instead of copying them, to reduce future
360         #       update work
361
362         # this is a bit CPU intensive but easy to analyze. We create a sort
363         # order for each peerid. If the peerid is marked as bad, we don't
364         # even put them in the list. Then we care about the number of shares
365         # which have already been assigned to them. After that we care about
366         # their permutation order.
367         old_assignments = DictOfSets()
368         for (peerid, shnum) in self.goal:
369             old_assignments.add(peerid, shnum)
370
371         peerlist = []
372         for i, (peerid, ss) in enumerate(self.full_peerlist):
373             if peerid in self.bad_peers:
374                 continue
375             entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
376             peerlist.append(entry)
377         peerlist.sort()
378
379         if not peerlist:
380             raise NotEnoughServersError("Ran out of non-bad servers")
381
382         new_assignments = []
383         # we then index this peerlist with an integer, because we may have to
384         # wrap. We update the goal as we go.
385         i = 0
386         for shnum in homeless_shares:
387             (ignored1, ignored2, peerid, ss) = peerlist[i]
388             # TODO: if we are forced to send a share to a server that already
389             # has one, we may have two write requests in flight, and the
390             # servermap (which was computed before either request was sent)
391             # won't reflect the new shares, so the second response will cause
392             # us to be surprised ("unexpected share on peer"), causing the
393             # publish to fail with an UncoordinatedWriteError. This is
394             # troublesome but not really a bit problem. Fix it at some point.
395             self.goal.add( (peerid, shnum) )
396             self.connections[peerid] = ss
397             i += 1
398             if i >= len(peerlist):
399                 i = 0
400         if True:
401             self.log_goal(self.goal, "after update: ")
402
403
404
405     def _encrypt_and_encode(self):
406         # this returns a Deferred that fires with a list of (sharedata,
407         # sharenum) tuples. TODO: cache the ciphertext, only produce the
408         # shares that we care about.
409         self.log("_encrypt_and_encode")
410
411         self._status.set_status("Encrypting")
412         started = time.time()
413
414         key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
415         enc = AES(key)
416         crypttext = enc.process(self.newdata)
417         assert len(crypttext) == len(self.newdata)
418
419         now = time.time()
420         self._status.timings["encrypt"] = now - started
421         started = now
422
423         # now apply FEC
424
425         self._status.set_status("Encoding")
426         fec = codec.CRSEncoder()
427         fec.set_params(self.segment_size,
428                        self.required_shares, self.total_shares)
429         piece_size = fec.get_block_size()
430         crypttext_pieces = [None] * self.required_shares
431         for i in range(len(crypttext_pieces)):
432             offset = i * piece_size
433             piece = crypttext[offset:offset+piece_size]
434             piece = piece + "\x00"*(piece_size - len(piece)) # padding
435             crypttext_pieces[i] = piece
436             assert len(piece) == piece_size
437
438         d = fec.encode(crypttext_pieces)
439         def _done_encoding(res):
440             elapsed = time.time() - started
441             self._status.timings["encode"] = elapsed
442             return res
443         d.addCallback(_done_encoding)
444         return d
445
446     def _generate_shares(self, shares_and_shareids):
447         # this sets self.shares and self.root_hash
448         self.log("_generate_shares")
449         self._status.set_status("Generating Shares")
450         started = time.time()
451
452         # we should know these by now
453         privkey = self._privkey
454         encprivkey = self._encprivkey
455         pubkey = self._pubkey
456
457         (shares, share_ids) = shares_and_shareids
458
459         assert len(shares) == len(share_ids)
460         assert len(shares) == self.total_shares
461         all_shares = {}
462         block_hash_trees = {}
463         share_hash_leaves = [None] * len(shares)
464         for i in range(len(shares)):
465             share_data = shares[i]
466             shnum = share_ids[i]
467             all_shares[shnum] = share_data
468
469             # build the block hash tree. SDMF has only one leaf.
470             leaves = [hashutil.block_hash(share_data)]
471             t = hashtree.HashTree(leaves)
472             block_hash_trees[shnum] = block_hash_tree = list(t)
473             share_hash_leaves[shnum] = t[0]
474         for leaf in share_hash_leaves:
475             assert leaf is not None
476         share_hash_tree = hashtree.HashTree(share_hash_leaves)
477         share_hash_chain = {}
478         for shnum in range(self.total_shares):
479             needed_hashes = share_hash_tree.needed_hashes(shnum)
480             share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
481                                               for i in needed_hashes ] )
482         root_hash = share_hash_tree[0]
483         assert len(root_hash) == 32
484         self.log("my new root_hash is %s" % base32.b2a(root_hash))
485
486         prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
487                              self.required_shares, self.total_shares,
488                              self.segment_size, len(self.newdata))
489
490         # now pack the beginning of the share. All shares are the same up
491         # to the signature, then they have divergent share hash chains,
492         # then completely different block hash trees + salt + share data,
493         # then they all share the same encprivkey at the end. The sizes
494         # of everything are the same for all shares.
495
496         sign_started = time.time()
497         signature = privkey.sign(prefix)
498         self._status.timings["sign"] = time.time() - sign_started
499
500         verification_key = pubkey.serialize()
501
502         final_shares = {}
503         for shnum in range(self.total_shares):
504             final_share = pack_share(prefix,
505                                      verification_key,
506                                      signature,
507                                      share_hash_chain[shnum],
508                                      block_hash_trees[shnum],
509                                      all_shares[shnum],
510                                      encprivkey)
511             final_shares[shnum] = final_share
512         elapsed = time.time() - started
513         self._status.timings["pack"] = elapsed
514         self.shares = final_shares
515         self.root_hash = root_hash
516
517         # we also need to build up the version identifier for what we're
518         # pushing. Extract the offsets from one of our shares.
519         assert final_shares
520         offsets = unpack_header(final_shares.values()[0])[-1]
521         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
522         verinfo = (self._new_seqnum, root_hash, self.salt,
523                    self.segment_size, len(self.newdata),
524                    self.required_shares, self.total_shares,
525                    prefix, offsets_tuple)
526         self.versioninfo = verinfo
527
528
529
530     def _send_shares(self, needed):
531         self.log("_send_shares")
532
533         # we're finally ready to send out our shares. If we encounter any
534         # surprises here, it's because somebody else is writing at the same
535         # time. (Note: in the future, when we remove the _query_peers() step
536         # and instead speculate about [or remember] which shares are where,
537         # surprises here are *not* indications of UncoordinatedWriteError,
538         # and we'll need to respond to them more gracefully.)
539
540         # needed is a set of (peerid, shnum) tuples. The first thing we do is
541         # organize it by peerid.
542
543         peermap = DictOfSets()
544         for (peerid, shnum) in needed:
545             peermap.add(peerid, shnum)
546
547         # the next thing is to build up a bunch of test vectors. The
548         # semantics of Publish are that we perform the operation if the world
549         # hasn't changed since the ServerMap was constructed (more or less).
550         # For every share we're trying to place, we create a test vector that
551         # tests to see if the server*share still corresponds to the
552         # map.
553
554         all_tw_vectors = {} # maps peerid to tw_vectors
555         sm = self._servermap.servermap
556
557         for key in needed:
558             (peerid, shnum) = key
559
560             if key in sm:
561                 # an old version of that share already exists on the
562                 # server, according to our servermap. We will create a
563                 # request that attempts to replace it.
564                 old_versionid, old_timestamp = sm[key]
565                 (old_seqnum, old_root_hash, old_salt, old_segsize,
566                  old_datalength, old_k, old_N, old_prefix,
567                  old_offsets_tuple) = old_versionid
568                 old_checkstring = pack_checkstring(old_seqnum,
569                                                    old_root_hash,
570                                                    old_salt)
571                 testv = (0, len(old_checkstring), "eq", old_checkstring)
572
573             elif key in self.bad_share_checkstrings:
574                 old_checkstring = self.bad_share_checkstrings[key]
575                 testv = (0, len(old_checkstring), "eq", old_checkstring)
576
577             else:
578                 # add a testv that requires the share not exist
579                 #testv = (0, 1, 'eq', "")
580
581                 # Unfortunately, foolscap-0.2.5 has a bug in the way inbound
582                 # constraints are handled. If the same object is referenced
583                 # multiple times inside the arguments, foolscap emits a
584                 # 'reference' token instead of a distinct copy of the
585                 # argument. The bug is that these 'reference' tokens are not
586                 # accepted by the inbound constraint code. To work around
587                 # this, we need to prevent python from interning the
588                 # (constant) tuple, by creating a new copy of this vector
589                 # each time. This bug is fixed in later versions of foolscap.
590                 testv = tuple([0, 1, 'eq', ""])
591
592             testvs = [testv]
593             # the write vector is simply the share
594             writev = [(0, self.shares[shnum])]
595
596             if peerid not in all_tw_vectors:
597                 all_tw_vectors[peerid] = {}
598                 # maps shnum to (testvs, writevs, new_length)
599             assert shnum not in all_tw_vectors[peerid]
600
601             all_tw_vectors[peerid][shnum] = (testvs, writev, None)
602
603         # we read the checkstring back from each share, however we only use
604         # it to detect whether there was a new share that we didn't know
605         # about. The success or failure of the write will tell us whether
606         # there was a collision or not. If there is a collision, the first
607         # thing we'll do is update the servermap, which will find out what
608         # happened. We could conceivably reduce a roundtrip by using the
609         # readv checkstring to populate the servermap, but really we'd have
610         # to read enough data to validate the signatures too, so it wouldn't
611         # be an overall win.
612         read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
613
614         # ok, send the messages!
615         self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
616         started = time.time()
617         for (peerid, tw_vectors) in all_tw_vectors.items():
618
619             write_enabler = self._node.get_write_enabler(peerid)
620             renew_secret = self._node.get_renewal_secret(peerid)
621             cancel_secret = self._node.get_cancel_secret(peerid)
622             secrets = (write_enabler, renew_secret, cancel_secret)
623             shnums = tw_vectors.keys()
624
625             for shnum in shnums:
626                 self.outstanding.add( (peerid, shnum) )
627
628             d = self._do_testreadwrite(peerid, secrets,
629                                        tw_vectors, read_vector)
630             d.addCallbacks(self._got_write_answer, self._got_write_error,
631                            callbackArgs=(peerid, shnums, started),
632                            errbackArgs=(peerid, shnums, started))
633             d.addCallback(self.loop)
634             d.addErrback(self._fatal_error)
635
636         self._update_status()
637         self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
638
639     def _do_testreadwrite(self, peerid, secrets,
640                           tw_vectors, read_vector):
641         storage_index = self._storage_index
642         ss = self.connections[peerid]
643
644         #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
645         d = ss.callRemote("slot_testv_and_readv_and_writev",
646                           storage_index,
647                           secrets,
648                           tw_vectors,
649                           read_vector)
650         return d
651
652     def _got_write_answer(self, answer, peerid, shnums, started):
653         lp = self.log("_got_write_answer from %s" %
654                       idlib.shortnodeid_b2a(peerid))
655         for shnum in shnums:
656             self.outstanding.discard( (peerid, shnum) )
657
658         now = time.time()
659         elapsed = now - started
660         self._status.add_per_server_time(peerid, elapsed)
661
662         wrote, read_data = answer
663
664         surprise_shares = set(read_data.keys()) - set(shnums)
665         if surprise_shares:
666             self.log("they had shares %s that we didn't know about" %
667                      (list(surprise_shares),),
668                      parent=lp, level=log.WEIRD, umid="un9CSQ")
669             self.surprised = True
670
671         if not wrote:
672             # TODO: there are two possibilities. The first is that the server
673             # is full (or just doesn't want to give us any room), which means
674             # we shouldn't ask them again, but is *not* an indication of an
675             # uncoordinated write. The second is that our testv failed, which
676             # *does* indicate an uncoordinated write. We currently don't have
677             # a way to tell these two apart (in fact, the storage server code
678             # doesn't have the option of refusing our share).
679             #
680             # If the server is full, mark the peer as bad (so we don't ask
681             # them again), but don't set self.surprised. The loop() will find
682             # a new server.
683             #
684             # If the testv failed, log it, set self.surprised, but don't
685             # bother adding to self.bad_peers .
686
687             self.log("our testv failed, so the write did not happen",
688                      parent=lp, level=log.WEIRD, umid="8sc26g")
689             self.surprised = True
690             self.bad_peers.add(peerid) # don't ask them again
691             # use the checkstring to add information to the log message
692             for (shnum,readv) in read_data.items():
693                 checkstring = readv[0]
694                 (other_seqnum,
695                  other_roothash,
696                  other_salt) = unpack_checkstring(checkstring)
697                 expected_version = self._servermap.version_on_peer(peerid,
698                                                                    shnum)
699                 if expected_version:
700                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
701                      offsets_tuple) = expected_version
702                     self.log("somebody modified the share on us:"
703                              " shnum=%d: I thought they had #%d:R=%s,"
704                              " but testv reported #%d:R=%s" %
705                              (shnum,
706                               seqnum, base32.b2a(root_hash)[:4],
707                               other_seqnum, base32.b2a(other_roothash)[:4]),
708                              parent=lp, level=log.NOISY)
709                 # if expected_version==None, then we didn't expect to see a
710                 # share on that peer, and the 'surprise_shares' clause above
711                 # will have logged it.
712             # self.loop() will take care of finding new homes
713             return
714
715         for shnum in shnums:
716             self.placed.add( (peerid, shnum) )
717             # and update the servermap
718             self._servermap.add_new_share(peerid, shnum,
719                                           self.versioninfo, started)
720
721         # self.loop() will take care of checking to see if we're done
722         return
723
724     def _got_write_error(self, f, peerid, shnums, started):
725         for shnum in shnums:
726             self.outstanding.discard( (peerid, shnum) )
727         self.bad_peers.add(peerid)
728         self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
729                  shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
730                  failure=f,
731                  level=log.UNUSUAL)
732         # self.loop() will take care of checking to see if we're done
733         return
734
735
736     def _done(self, res):
737         if not self._running:
738             return
739         self._running = False
740         now = time.time()
741         self._status.timings["total"] = now - self._started
742         self._status.set_active(False)
743         if isinstance(res, failure.Failure):
744             self.log("Publish done, with failure", failure=res,
745                      level=log.WEIRD, umid="nRsR9Q")
746             self._status.set_status("Failed")
747         elif self.surprised:
748             self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
749             self._status.set_status("UncoordinatedWriteError")
750             # deliver a failure
751             res = failure.Failure(UncoordinatedWriteError())
752             # TODO: recovery
753         else:
754             self.log("Publish done, success")
755             self._status.set_status("Done")
756             self._status.set_progress(1.0)
757         eventually(self.done_deferred.callback, res)
758
759