]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
mutable: remove work-around for a flaw in an older version of foolscap
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, struct, time
4 from itertools import count
5 from zope.interface import implements
6 from twisted.internet import defer
7 from twisted.python import failure
8 from allmydata.interfaces import IPublishStatus, FileTooLargeError
9 from allmydata.util import base32, hashutil, mathutil, idlib, log
10 from allmydata import hashtree, codec, storage
11 from pycryptopp.cipher.aes import AES
12 from foolscap.eventual import eventually
13
14 from common import MODE_WRITE, MODE_CHECK, DictOfSets, \
15      UncoordinatedWriteError, NotEnoughServersError
16 from servermap import ServerMap
17 from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
18      unpack_checkstring, SIGNED_PREFIX
19
20 class PublishStatus:
21     implements(IPublishStatus)
22     statusid_counter = count(0)
23     def __init__(self):
24         self.timings = {}
25         self.timings["send_per_server"] = {}
26         self.servermap = None
27         self.problems = {}
28         self.active = True
29         self.storage_index = None
30         self.helper = False
31         self.encoding = ("?", "?")
32         self.size = None
33         self.status = "Not started"
34         self.progress = 0.0
35         self.counter = self.statusid_counter.next()
36         self.started = time.time()
37
38     def add_per_server_time(self, peerid, elapsed):
39         if peerid not in self.timings["send_per_server"]:
40             self.timings["send_per_server"][peerid] = []
41         self.timings["send_per_server"][peerid].append(elapsed)
42
43     def get_started(self):
44         return self.started
45     def get_storage_index(self):
46         return self.storage_index
47     def get_encoding(self):
48         return self.encoding
49     def using_helper(self):
50         return self.helper
51     def get_servermap(self):
52         return self.servermap
53     def get_size(self):
54         return self.size
55     def get_status(self):
56         return self.status
57     def get_progress(self):
58         return self.progress
59     def get_active(self):
60         return self.active
61     def get_counter(self):
62         return self.counter
63
64     def set_storage_index(self, si):
65         self.storage_index = si
66     def set_helper(self, helper):
67         self.helper = helper
68     def set_servermap(self, servermap):
69         self.servermap = servermap
70     def set_encoding(self, k, n):
71         self.encoding = (k, n)
72     def set_size(self, size):
73         self.size = size
74     def set_status(self, status):
75         self.status = status
76     def set_progress(self, value):
77         self.progress = value
78     def set_active(self, value):
79         self.active = value
80
81 class Publish:
82     """I represent a single act of publishing the mutable file to the grid. I
83     will only publish my data if the servermap I am using still represents
84     the current state of the world.
85
86     To make the initial publish, set servermap to None.
87     """
88
89     # we limit the segment size as usual to constrain our memory footprint.
90     # The max segsize is higher for mutable files, because we want to support
91     # dirnodes with up to 10k children, and each child uses about 330 bytes.
92     # If you actually put that much into a directory you'll be using a
93     # footprint of around 14MB, which is higher than we'd like, but it is
94     # more important right now to support large directories than to make
95     # memory usage small when you use them. Once we implement MDMF (with
96     # multiple segments), we will drop this back down, probably to 128KiB.
97     MAX_SEGMENT_SIZE = 3500000
98
99     def __init__(self, filenode, servermap):
100         self._node = filenode
101         self._servermap = servermap
102         self._storage_index = self._node.get_storage_index()
103         self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
104         num = self._node._client.log("Publish(%s): starting" % prefix)
105         self._log_number = num
106         self._running = True
107
108         self._status = PublishStatus()
109         self._status.set_storage_index(self._storage_index)
110         self._status.set_helper(False)
111         self._status.set_progress(0.0)
112         self._status.set_active(True)
113
114     def get_status(self):
115         return self._status
116
117     def log(self, *args, **kwargs):
118         if 'parent' not in kwargs:
119             kwargs['parent'] = self._log_number
120         if "facility" not in kwargs:
121             kwargs["facility"] = "tahoe.mutable.publish"
122         return log.msg(*args, **kwargs)
123
124     def publish(self, newdata):
125         """Publish the filenode's current contents.  Returns a Deferred that
126         fires (with None) when the publish has done as much work as it's ever
127         going to do, or errbacks with ConsistencyError if it detects a
128         simultaneous write.
129         """
130
131         # 1: generate shares (SDMF: files are small, so we can do it in RAM)
132         # 2: perform peer selection, get candidate servers
133         #  2a: send queries to n+epsilon servers, to determine current shares
134         #  2b: based upon responses, create target map
135         # 3: send slot_testv_and_readv_and_writev messages
136         # 4: as responses return, update share-dispatch table
137         # 4a: may need to run recovery algorithm
138         # 5: when enough responses are back, we're done
139
140         self.log("starting publish, datalen is %s" % len(newdata))
141         if len(newdata) > self.MAX_SEGMENT_SIZE:
142             raise FileTooLargeError("SDMF is limited to one segment, and "
143                                     "%d > %d" % (len(newdata),
144                                                  self.MAX_SEGMENT_SIZE))
145         self._status.set_size(len(newdata))
146         self._status.set_status("Started")
147         self._started = time.time()
148
149         self.done_deferred = defer.Deferred()
150
151         self._writekey = self._node.get_writekey()
152         assert self._writekey, "need write capability to publish"
153
154         # first, which servers will we publish to? We require that the
155         # servermap was updated in MODE_WRITE, so we can depend upon the
156         # peerlist computed by that process instead of computing our own.
157         if self._servermap:
158             assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
159             # we will push a version that is one larger than anything present
160             # in the grid, according to the servermap.
161             self._new_seqnum = self._servermap.highest_seqnum() + 1
162         else:
163             # If we don't have a servermap, that's because we're doing the
164             # initial publish
165             self._new_seqnum = 1
166             self._servermap = ServerMap()
167         self._status.set_servermap(self._servermap)
168
169         self.log(format="new seqnum will be %(seqnum)d",
170                  seqnum=self._new_seqnum, level=log.NOISY)
171
172         # having an up-to-date servermap (or using a filenode that was just
173         # created for the first time) also guarantees that the following
174         # fields are available
175         self.readkey = self._node.get_readkey()
176         self.required_shares = self._node.get_required_shares()
177         assert self.required_shares is not None
178         self.total_shares = self._node.get_total_shares()
179         assert self.total_shares is not None
180         self._status.set_encoding(self.required_shares, self.total_shares)
181
182         self._pubkey = self._node.get_pubkey()
183         assert self._pubkey
184         self._privkey = self._node.get_privkey()
185         assert self._privkey
186         self._encprivkey = self._node.get_encprivkey()
187
188         client = self._node._client
189         full_peerlist = client.get_permuted_peers("storage",
190                                                   self._storage_index)
191         self.full_peerlist = full_peerlist # for use later, immutable
192         self.bad_peers = set() # peerids who have errbacked/refused requests
193
194         self.newdata = newdata
195         self.salt = os.urandom(16)
196
197         self.setup_encoding_parameters()
198
199         # if we experience any surprises (writes which were rejected because
200         # our test vector did not match, or shares which we didn't expect to
201         # see), we set this flag and report an UncoordinatedWriteError at the
202         # end of the publish process.
203         self.surprised = False
204
205         # as a failsafe, refuse to iterate through self.loop more than a
206         # thousand times.
207         self.looplimit = 1000
208
209         # we keep track of three tables. The first is our goal: which share
210         # we want to see on which servers. This is initially populated by the
211         # existing servermap.
212         self.goal = set() # pairs of (peerid, shnum) tuples
213
214         # the second table is our list of outstanding queries: those which
215         # are in flight and may or may not be delivered, accepted, or
216         # acknowledged. Items are added to this table when the request is
217         # sent, and removed when the response returns (or errbacks).
218         self.outstanding = set() # (peerid, shnum) tuples
219
220         # the third is a table of successes: share which have actually been
221         # placed. These are populated when responses come back with success.
222         # When self.placed == self.goal, we're done.
223         self.placed = set() # (peerid, shnum) tuples
224
225         # we also keep a mapping from peerid to RemoteReference. Each time we
226         # pull a connection out of the full peerlist, we add it to this for
227         # use later.
228         self.connections = {}
229
230         self.bad_share_checkstrings = {}
231
232         # we use the servermap to populate the initial goal: this way we will
233         # try to update each existing share in place.
234         for (peerid, shnum) in self._servermap.servermap:
235             self.goal.add( (peerid, shnum) )
236             self.connections[peerid] = self._servermap.connections[peerid]
237         # then we add in all the shares that were bad (corrupted, bad
238         # signatures, etc). We want to replace these.
239         for (peerid, shnum, old_checkstring) in self._servermap.bad_shares:
240             self.goal.add( (peerid, shnum) )
241             self.bad_share_checkstrings[ (peerid, shnum) ] = old_checkstring
242             self.connections[peerid] = self._servermap.connections[peerid]
243
244         # create the shares. We'll discard these as they are delivered. SDMF:
245         # we're allowed to hold everything in memory.
246
247         self._status.timings["setup"] = time.time() - self._started
248         d = self._encrypt_and_encode()
249         d.addCallback(self._generate_shares)
250         def _start_pushing(res):
251             self._started_pushing = time.time()
252             return res
253         d.addCallback(_start_pushing)
254         d.addCallback(self.loop) # trigger delivery
255         d.addErrback(self._fatal_error)
256
257         return self.done_deferred
258
259     def setup_encoding_parameters(self):
260         segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
261         # this must be a multiple of self.required_shares
262         segment_size = mathutil.next_multiple(segment_size,
263                                               self.required_shares)
264         self.segment_size = segment_size
265         if segment_size:
266             self.num_segments = mathutil.div_ceil(len(self.newdata),
267                                                   segment_size)
268         else:
269             self.num_segments = 0
270         assert self.num_segments in [0, 1,] # SDMF restrictions
271
272     def _fatal_error(self, f):
273         self.log("error during loop", failure=f, level=log.UNUSUAL)
274         self._done(f)
275
276     def _update_status(self):
277         self._status.set_status("Sending Shares: %d placed out of %d, "
278                                 "%d messages outstanding" %
279                                 (len(self.placed),
280                                  len(self.goal),
281                                  len(self.outstanding)))
282         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
283
284     def loop(self, ignored=None):
285         self.log("entering loop", level=log.NOISY)
286         if not self._running:
287             return
288
289         self.looplimit -= 1
290         if self.looplimit <= 0:
291             raise RuntimeError("loop limit exceeded")
292
293         if self.surprised:
294             # don't send out any new shares, just wait for the outstanding
295             # ones to be retired.
296             self.log("currently surprised, so don't send any new shares",
297                      level=log.NOISY)
298         else:
299             self.update_goal()
300             # how far are we from our goal?
301             needed = self.goal - self.placed - self.outstanding
302             self._update_status()
303
304             if needed:
305                 # we need to send out new shares
306                 self.log(format="need to send %(needed)d new shares",
307                          needed=len(needed), level=log.NOISY)
308                 self._send_shares(needed)
309                 return
310
311         if self.outstanding:
312             # queries are still pending, keep waiting
313             self.log(format="%(outstanding)d queries still outstanding",
314                      outstanding=len(self.outstanding),
315                      level=log.NOISY)
316             return
317
318         # no queries outstanding, no placements needed: we're done
319         self.log("no queries outstanding, no placements needed: done",
320                  level=log.OPERATIONAL)
321         now = time.time()
322         elapsed = now - self._started_pushing
323         self._status.timings["push"] = elapsed
324         return self._done(None)
325
326     def log_goal(self, goal, message=""):
327         logmsg = [message]
328         for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
329             logmsg.append("sh%d to [%s]" % (shnum,
330                                             idlib.shortnodeid_b2a(peerid)))
331         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
332         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
333                  level=log.NOISY)
334
335     def update_goal(self):
336         # if log.recording_noisy
337         if True:
338             self.log_goal(self.goal, "before update: ")
339
340         # first, remove any bad peers from our goal
341         self.goal = set([ (peerid, shnum)
342                           for (peerid, shnum) in self.goal
343                           if peerid not in self.bad_peers ])
344
345         # find the homeless shares:
346         homefull_shares = set([shnum for (peerid, shnum) in self.goal])
347         homeless_shares = set(range(self.total_shares)) - homefull_shares
348         homeless_shares = sorted(list(homeless_shares))
349         # place them somewhere. We prefer unused servers at the beginning of
350         # the available peer list.
351
352         if not homeless_shares:
353             return
354
355         # if an old share X is on a node, put the new share X there too.
356         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
357         #       shares from existing peers to new (less-crowded) ones. The
358         #       old shares must still be updated.
359         # TODO: 2: move those shares instead of copying them, to reduce future
360         #       update work
361
362         # this is a bit CPU intensive but easy to analyze. We create a sort
363         # order for each peerid. If the peerid is marked as bad, we don't
364         # even put them in the list. Then we care about the number of shares
365         # which have already been assigned to them. After that we care about
366         # their permutation order.
367         old_assignments = DictOfSets()
368         for (peerid, shnum) in self.goal:
369             old_assignments.add(peerid, shnum)
370
371         peerlist = []
372         for i, (peerid, ss) in enumerate(self.full_peerlist):
373             if peerid in self.bad_peers:
374                 continue
375             entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
376             peerlist.append(entry)
377         peerlist.sort()
378
379         if not peerlist:
380             raise NotEnoughServersError("Ran out of non-bad servers")
381
382         new_assignments = []
383         # we then index this peerlist with an integer, because we may have to
384         # wrap. We update the goal as we go.
385         i = 0
386         for shnum in homeless_shares:
387             (ignored1, ignored2, peerid, ss) = peerlist[i]
388             # TODO: if we are forced to send a share to a server that already
389             # has one, we may have two write requests in flight, and the
390             # servermap (which was computed before either request was sent)
391             # won't reflect the new shares, so the second response will cause
392             # us to be surprised ("unexpected share on peer"), causing the
393             # publish to fail with an UncoordinatedWriteError. This is
394             # troublesome but not really a bit problem. Fix it at some point.
395             self.goal.add( (peerid, shnum) )
396             self.connections[peerid] = ss
397             i += 1
398             if i >= len(peerlist):
399                 i = 0
400         if True:
401             self.log_goal(self.goal, "after update: ")
402
403
404
405     def _encrypt_and_encode(self):
406         # this returns a Deferred that fires with a list of (sharedata,
407         # sharenum) tuples. TODO: cache the ciphertext, only produce the
408         # shares that we care about.
409         self.log("_encrypt_and_encode")
410
411         self._status.set_status("Encrypting")
412         started = time.time()
413
414         key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
415         enc = AES(key)
416         crypttext = enc.process(self.newdata)
417         assert len(crypttext) == len(self.newdata)
418
419         now = time.time()
420         self._status.timings["encrypt"] = now - started
421         started = now
422
423         # now apply FEC
424
425         self._status.set_status("Encoding")
426         fec = codec.CRSEncoder()
427         fec.set_params(self.segment_size,
428                        self.required_shares, self.total_shares)
429         piece_size = fec.get_block_size()
430         crypttext_pieces = [None] * self.required_shares
431         for i in range(len(crypttext_pieces)):
432             offset = i * piece_size
433             piece = crypttext[offset:offset+piece_size]
434             piece = piece + "\x00"*(piece_size - len(piece)) # padding
435             crypttext_pieces[i] = piece
436             assert len(piece) == piece_size
437
438         d = fec.encode(crypttext_pieces)
439         def _done_encoding(res):
440             elapsed = time.time() - started
441             self._status.timings["encode"] = elapsed
442             return res
443         d.addCallback(_done_encoding)
444         return d
445
446     def _generate_shares(self, shares_and_shareids):
447         # this sets self.shares and self.root_hash
448         self.log("_generate_shares")
449         self._status.set_status("Generating Shares")
450         started = time.time()
451
452         # we should know these by now
453         privkey = self._privkey
454         encprivkey = self._encprivkey
455         pubkey = self._pubkey
456
457         (shares, share_ids) = shares_and_shareids
458
459         assert len(shares) == len(share_ids)
460         assert len(shares) == self.total_shares
461         all_shares = {}
462         block_hash_trees = {}
463         share_hash_leaves = [None] * len(shares)
464         for i in range(len(shares)):
465             share_data = shares[i]
466             shnum = share_ids[i]
467             all_shares[shnum] = share_data
468
469             # build the block hash tree. SDMF has only one leaf.
470             leaves = [hashutil.block_hash(share_data)]
471             t = hashtree.HashTree(leaves)
472             block_hash_trees[shnum] = block_hash_tree = list(t)
473             share_hash_leaves[shnum] = t[0]
474         for leaf in share_hash_leaves:
475             assert leaf is not None
476         share_hash_tree = hashtree.HashTree(share_hash_leaves)
477         share_hash_chain = {}
478         for shnum in range(self.total_shares):
479             needed_hashes = share_hash_tree.needed_hashes(shnum)
480             share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
481                                               for i in needed_hashes ] )
482         root_hash = share_hash_tree[0]
483         assert len(root_hash) == 32
484         self.log("my new root_hash is %s" % base32.b2a(root_hash))
485
486         prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
487                              self.required_shares, self.total_shares,
488                              self.segment_size, len(self.newdata))
489
490         # now pack the beginning of the share. All shares are the same up
491         # to the signature, then they have divergent share hash chains,
492         # then completely different block hash trees + salt + share data,
493         # then they all share the same encprivkey at the end. The sizes
494         # of everything are the same for all shares.
495
496         sign_started = time.time()
497         signature = privkey.sign(prefix)
498         self._status.timings["sign"] = time.time() - sign_started
499
500         verification_key = pubkey.serialize()
501
502         final_shares = {}
503         for shnum in range(self.total_shares):
504             final_share = pack_share(prefix,
505                                      verification_key,
506                                      signature,
507                                      share_hash_chain[shnum],
508                                      block_hash_trees[shnum],
509                                      all_shares[shnum],
510                                      encprivkey)
511             final_shares[shnum] = final_share
512         elapsed = time.time() - started
513         self._status.timings["pack"] = elapsed
514         self.shares = final_shares
515         self.root_hash = root_hash
516
517         # we also need to build up the version identifier for what we're
518         # pushing. Extract the offsets from one of our shares.
519         assert final_shares
520         offsets = unpack_header(final_shares.values()[0])[-1]
521         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
522         verinfo = (self._new_seqnum, root_hash, self.salt,
523                    self.segment_size, len(self.newdata),
524                    self.required_shares, self.total_shares,
525                    prefix, offsets_tuple)
526         self.versioninfo = verinfo
527
528
529
530     def _send_shares(self, needed):
531         self.log("_send_shares")
532
533         # we're finally ready to send out our shares. If we encounter any
534         # surprises here, it's because somebody else is writing at the same
535         # time. (Note: in the future, when we remove the _query_peers() step
536         # and instead speculate about [or remember] which shares are where,
537         # surprises here are *not* indications of UncoordinatedWriteError,
538         # and we'll need to respond to them more gracefully.)
539
540         # needed is a set of (peerid, shnum) tuples. The first thing we do is
541         # organize it by peerid.
542
543         peermap = DictOfSets()
544         for (peerid, shnum) in needed:
545             peermap.add(peerid, shnum)
546
547         # the next thing is to build up a bunch of test vectors. The
548         # semantics of Publish are that we perform the operation if the world
549         # hasn't changed since the ServerMap was constructed (more or less).
550         # For every share we're trying to place, we create a test vector that
551         # tests to see if the server*share still corresponds to the
552         # map.
553
554         all_tw_vectors = {} # maps peerid to tw_vectors
555         sm = self._servermap.servermap
556
557         for key in needed:
558             (peerid, shnum) = key
559
560             if key in sm:
561                 # an old version of that share already exists on the
562                 # server, according to our servermap. We will create a
563                 # request that attempts to replace it.
564                 old_versionid, old_timestamp = sm[key]
565                 (old_seqnum, old_root_hash, old_salt, old_segsize,
566                  old_datalength, old_k, old_N, old_prefix,
567                  old_offsets_tuple) = old_versionid
568                 old_checkstring = pack_checkstring(old_seqnum,
569                                                    old_root_hash,
570                                                    old_salt)
571                 testv = (0, len(old_checkstring), "eq", old_checkstring)
572
573             elif key in self.bad_share_checkstrings:
574                 old_checkstring = self.bad_share_checkstrings[key]
575                 testv = (0, len(old_checkstring), "eq", old_checkstring)
576
577             else:
578                 # add a testv that requires the share not exist
579                 testv = (0, 1, 'eq', "")
580
581             testvs = [testv]
582             # the write vector is simply the share
583             writev = [(0, self.shares[shnum])]
584
585             if peerid not in all_tw_vectors:
586                 all_tw_vectors[peerid] = {}
587                 # maps shnum to (testvs, writevs, new_length)
588             assert shnum not in all_tw_vectors[peerid]
589
590             all_tw_vectors[peerid][shnum] = (testvs, writev, None)
591
592         # we read the checkstring back from each share, however we only use
593         # it to detect whether there was a new share that we didn't know
594         # about. The success or failure of the write will tell us whether
595         # there was a collision or not. If there is a collision, the first
596         # thing we'll do is update the servermap, which will find out what
597         # happened. We could conceivably reduce a roundtrip by using the
598         # readv checkstring to populate the servermap, but really we'd have
599         # to read enough data to validate the signatures too, so it wouldn't
600         # be an overall win.
601         read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
602
603         # ok, send the messages!
604         self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
605         started = time.time()
606         for (peerid, tw_vectors) in all_tw_vectors.items():
607
608             write_enabler = self._node.get_write_enabler(peerid)
609             renew_secret = self._node.get_renewal_secret(peerid)
610             cancel_secret = self._node.get_cancel_secret(peerid)
611             secrets = (write_enabler, renew_secret, cancel_secret)
612             shnums = tw_vectors.keys()
613
614             for shnum in shnums:
615                 self.outstanding.add( (peerid, shnum) )
616
617             d = self._do_testreadwrite(peerid, secrets,
618                                        tw_vectors, read_vector)
619             d.addCallbacks(self._got_write_answer, self._got_write_error,
620                            callbackArgs=(peerid, shnums, started),
621                            errbackArgs=(peerid, shnums, started))
622             d.addCallback(self.loop)
623             d.addErrback(self._fatal_error)
624
625         self._update_status()
626         self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
627
628     def _do_testreadwrite(self, peerid, secrets,
629                           tw_vectors, read_vector):
630         storage_index = self._storage_index
631         ss = self.connections[peerid]
632
633         #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
634         d = ss.callRemote("slot_testv_and_readv_and_writev",
635                           storage_index,
636                           secrets,
637                           tw_vectors,
638                           read_vector)
639         return d
640
641     def _got_write_answer(self, answer, peerid, shnums, started):
642         lp = self.log("_got_write_answer from %s" %
643                       idlib.shortnodeid_b2a(peerid))
644         for shnum in shnums:
645             self.outstanding.discard( (peerid, shnum) )
646
647         now = time.time()
648         elapsed = now - started
649         self._status.add_per_server_time(peerid, elapsed)
650
651         wrote, read_data = answer
652
653         surprise_shares = set(read_data.keys()) - set(shnums)
654         if surprise_shares:
655             self.log("they had shares %s that we didn't know about" %
656                      (list(surprise_shares),),
657                      parent=lp, level=log.WEIRD, umid="un9CSQ")
658             self.surprised = True
659
660         if not wrote:
661             # TODO: there are two possibilities. The first is that the server
662             # is full (or just doesn't want to give us any room), which means
663             # we shouldn't ask them again, but is *not* an indication of an
664             # uncoordinated write. The second is that our testv failed, which
665             # *does* indicate an uncoordinated write. We currently don't have
666             # a way to tell these two apart (in fact, the storage server code
667             # doesn't have the option of refusing our share).
668             #
669             # If the server is full, mark the peer as bad (so we don't ask
670             # them again), but don't set self.surprised. The loop() will find
671             # a new server.
672             #
673             # If the testv failed, log it, set self.surprised, but don't
674             # bother adding to self.bad_peers .
675
676             self.log("our testv failed, so the write did not happen",
677                      parent=lp, level=log.WEIRD, umid="8sc26g")
678             self.surprised = True
679             self.bad_peers.add(peerid) # don't ask them again
680             # use the checkstring to add information to the log message
681             for (shnum,readv) in read_data.items():
682                 checkstring = readv[0]
683                 (other_seqnum,
684                  other_roothash,
685                  other_salt) = unpack_checkstring(checkstring)
686                 expected_version = self._servermap.version_on_peer(peerid,
687                                                                    shnum)
688                 if expected_version:
689                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
690                      offsets_tuple) = expected_version
691                     self.log("somebody modified the share on us:"
692                              " shnum=%d: I thought they had #%d:R=%s,"
693                              " but testv reported #%d:R=%s" %
694                              (shnum,
695                               seqnum, base32.b2a(root_hash)[:4],
696                               other_seqnum, base32.b2a(other_roothash)[:4]),
697                              parent=lp, level=log.NOISY)
698                 # if expected_version==None, then we didn't expect to see a
699                 # share on that peer, and the 'surprise_shares' clause above
700                 # will have logged it.
701             # self.loop() will take care of finding new homes
702             return
703
704         for shnum in shnums:
705             self.placed.add( (peerid, shnum) )
706             # and update the servermap
707             self._servermap.add_new_share(peerid, shnum,
708                                           self.versioninfo, started)
709
710         # self.loop() will take care of checking to see if we're done
711         return
712
713     def _got_write_error(self, f, peerid, shnums, started):
714         for shnum in shnums:
715             self.outstanding.discard( (peerid, shnum) )
716         self.bad_peers.add(peerid)
717         self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
718                  shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
719                  failure=f,
720                  level=log.UNUSUAL)
721         # self.loop() will take care of checking to see if we're done
722         return
723
724
725     def _done(self, res):
726         if not self._running:
727             return
728         self._running = False
729         now = time.time()
730         self._status.timings["total"] = now - self._started
731         self._status.set_active(False)
732         if isinstance(res, failure.Failure):
733             self.log("Publish done, with failure", failure=res,
734                      level=log.WEIRD, umid="nRsR9Q")
735             self._status.set_status("Failed")
736         elif self.surprised:
737             self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
738             self._status.set_status("UncoordinatedWriteError")
739             # deliver a failure
740             res = failure.Failure(UncoordinatedWriteError())
741             # TODO: recovery
742         else:
743             self.log("Publish done, success")
744             self._status.set_status("Done")
745             self._status.set_progress(1.0)
746         eventually(self.done_deferred.callback, res)
747
748