]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
mutable/publish.py: raise FileTooLargeError instead of an ugly assertion when the...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, struct, time
4 from itertools import count
5 from zope.interface import implements
6 from twisted.internet import defer
7 from twisted.python import failure
8 from allmydata.interfaces import IPublishStatus, FileTooLargeError
9 from allmydata.util import base32, hashutil, mathutil, idlib, log
10 from allmydata import hashtree, codec, storage
11 from pycryptopp.cipher.aes import AES
12 from foolscap.eventual import eventually
13
14 from common import MODE_WRITE, DictOfSets, \
15      UncoordinatedWriteError, NotEnoughServersError
16 from servermap import ServerMap
17 from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
18      unpack_checkstring, SIGNED_PREFIX
19
20 class PublishStatus:
21     implements(IPublishStatus)
22     statusid_counter = count(0)
23     def __init__(self):
24         self.timings = {}
25         self.timings["send_per_server"] = {}
26         self.servermap = None
27         self.problems = {}
28         self.active = True
29         self.storage_index = None
30         self.helper = False
31         self.encoding = ("?", "?")
32         self.size = None
33         self.status = "Not started"
34         self.progress = 0.0
35         self.counter = self.statusid_counter.next()
36         self.started = time.time()
37
38     def add_per_server_time(self, peerid, elapsed):
39         if peerid not in self.timings["send_per_server"]:
40             self.timings["send_per_server"][peerid] = []
41         self.timings["send_per_server"][peerid].append(elapsed)
42
43     def get_started(self):
44         return self.started
45     def get_storage_index(self):
46         return self.storage_index
47     def get_encoding(self):
48         return self.encoding
49     def using_helper(self):
50         return self.helper
51     def get_servermap(self):
52         return self.servermap
53     def get_size(self):
54         return self.size
55     def get_status(self):
56         return self.status
57     def get_progress(self):
58         return self.progress
59     def get_active(self):
60         return self.active
61     def get_counter(self):
62         return self.counter
63
64     def set_storage_index(self, si):
65         self.storage_index = si
66     def set_helper(self, helper):
67         self.helper = helper
68     def set_servermap(self, servermap):
69         self.servermap = servermap
70     def set_encoding(self, k, n):
71         self.encoding = (k, n)
72     def set_size(self, size):
73         self.size = size
74     def set_status(self, status):
75         self.status = status
76     def set_progress(self, value):
77         self.progress = value
78     def set_active(self, value):
79         self.active = value
80
81 class Publish:
82     """I represent a single act of publishing the mutable file to the grid. I
83     will only publish my data if the servermap I am using still represents
84     the current state of the world.
85
86     To make the initial publish, set servermap to None.
87     """
88
89     # we limit the segment size as usual to constrain our memory footprint.
90     # The max segsize is higher for mutable files, because we want to support
91     # dirnodes with up to 10k children, and each child uses about 330 bytes.
92     # If you actually put that much into a directory you'll be using a
93     # footprint of around 14MB, which is higher than we'd like, but it is
94     # more important right now to support large directories than to make
95     # memory usage small when you use them. Once we implement MDMF (with
96     # multiple segments), we will drop this back down, probably to 128KiB.
97     MAX_SEGMENT_SIZE = 3500000
98
99     def __init__(self, filenode, servermap):
100         self._node = filenode
101         self._servermap = servermap
102         self._storage_index = self._node.get_storage_index()
103         self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
104         num = self._node._client.log("Publish(%s): starting" % prefix)
105         self._log_number = num
106         self._running = True
107
108         self._status = PublishStatus()
109         self._status.set_storage_index(self._storage_index)
110         self._status.set_helper(False)
111         self._status.set_progress(0.0)
112         self._status.set_active(True)
113
114     def get_status(self):
115         return self._status
116
117     def log(self, *args, **kwargs):
118         if 'parent' not in kwargs:
119             kwargs['parent'] = self._log_number
120         return log.msg(*args, **kwargs)
121
122     def publish(self, newdata):
123         """Publish the filenode's current contents.  Returns a Deferred that
124         fires (with None) when the publish has done as much work as it's ever
125         going to do, or errbacks with ConsistencyError if it detects a
126         simultaneous write.
127         """
128
129         # 1: generate shares (SDMF: files are small, so we can do it in RAM)
130         # 2: perform peer selection, get candidate servers
131         #  2a: send queries to n+epsilon servers, to determine current shares
132         #  2b: based upon responses, create target map
133         # 3: send slot_testv_and_readv_and_writev messages
134         # 4: as responses return, update share-dispatch table
135         # 4a: may need to run recovery algorithm
136         # 5: when enough responses are back, we're done
137
138         self.log("starting publish, datalen is %s" % len(newdata))
139         if len(newdata) > self.MAX_SEGMENT_SIZE:
140             raise FileTooLargeError("SDMF is limited to one segment, and "
141                                     "%d > %d" % (len(newdata),
142                                                  self.MAX_SEGMENT_SIZE))
143         self._status.set_size(len(newdata))
144         self._status.set_status("Started")
145         self._started = time.time()
146
147         self.done_deferred = defer.Deferred()
148
149         self._writekey = self._node.get_writekey()
150         assert self._writekey, "need write capability to publish"
151
152         # first, which servers will we publish to? We require that the
153         # servermap was updated in MODE_WRITE, so we can depend upon the
154         # peerlist computed by that process instead of computing our own.
155         if self._servermap:
156             assert self._servermap.last_update_mode == MODE_WRITE
157             # we will push a version that is one larger than anything present
158             # in the grid, according to the servermap.
159             self._new_seqnum = self._servermap.highest_seqnum() + 1
160         else:
161             # If we don't have a servermap, that's because we're doing the
162             # initial publish
163             self._new_seqnum = 1
164             self._servermap = ServerMap()
165         self._status.set_servermap(self._servermap)
166
167         self.log(format="new seqnum will be %(seqnum)d",
168                  seqnum=self._new_seqnum, level=log.NOISY)
169
170         # having an up-to-date servermap (or using a filenode that was just
171         # created for the first time) also guarantees that the following
172         # fields are available
173         self.readkey = self._node.get_readkey()
174         self.required_shares = self._node.get_required_shares()
175         assert self.required_shares is not None
176         self.total_shares = self._node.get_total_shares()
177         assert self.total_shares is not None
178         self._status.set_encoding(self.required_shares, self.total_shares)
179
180         self._pubkey = self._node.get_pubkey()
181         assert self._pubkey
182         self._privkey = self._node.get_privkey()
183         assert self._privkey
184         self._encprivkey = self._node.get_encprivkey()
185
186         client = self._node._client
187         full_peerlist = client.get_permuted_peers("storage",
188                                                   self._storage_index)
189         self.full_peerlist = full_peerlist # for use later, immutable
190         self.bad_peers = set() # peerids who have errbacked/refused requests
191
192         self.newdata = newdata
193         self.salt = os.urandom(16)
194
195         self.setup_encoding_parameters()
196
197         # if we experience any surprises (writes which were rejected because
198         # our test vector did not match, or shares which we didn't expect to
199         # see), we set this flag and report an UncoordinatedWriteError at the
200         # end of the publish process.
201         self.surprised = False
202
203         # as a failsafe, refuse to iterate through self.loop more than a
204         # thousand times.
205         self.looplimit = 1000
206
207         # we keep track of three tables. The first is our goal: which share
208         # we want to see on which servers. This is initially populated by the
209         # existing servermap.
210         self.goal = set() # pairs of (peerid, shnum) tuples
211
212         # the second table is our list of outstanding queries: those which
213         # are in flight and may or may not be delivered, accepted, or
214         # acknowledged. Items are added to this table when the request is
215         # sent, and removed when the response returns (or errbacks).
216         self.outstanding = set() # (peerid, shnum) tuples
217
218         # the third is a table of successes: share which have actually been
219         # placed. These are populated when responses come back with success.
220         # When self.placed == self.goal, we're done.
221         self.placed = set() # (peerid, shnum) tuples
222
223         # we also keep a mapping from peerid to RemoteReference. Each time we
224         # pull a connection out of the full peerlist, we add it to this for
225         # use later.
226         self.connections = {}
227
228         # we use the servermap to populate the initial goal: this way we will
229         # try to update each existing share in place.
230         for (peerid, shnum) in self._servermap.servermap:
231             self.goal.add( (peerid, shnum) )
232             self.connections[peerid] = self._servermap.connections[peerid]
233
234         # create the shares. We'll discard these as they are delivered. SMDF:
235         # we're allowed to hold everything in memory.
236
237         self._status.timings["setup"] = time.time() - self._started
238         d = self._encrypt_and_encode()
239         d.addCallback(self._generate_shares)
240         def _start_pushing(res):
241             self._started_pushing = time.time()
242             return res
243         d.addCallback(_start_pushing)
244         d.addCallback(self.loop) # trigger delivery
245         d.addErrback(self._fatal_error)
246
247         return self.done_deferred
248
249     def setup_encoding_parameters(self):
250         segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
251         # this must be a multiple of self.required_shares
252         segment_size = mathutil.next_multiple(segment_size,
253                                               self.required_shares)
254         self.segment_size = segment_size
255         if segment_size:
256             self.num_segments = mathutil.div_ceil(len(self.newdata),
257                                                   segment_size)
258         else:
259             self.num_segments = 0
260         assert self.num_segments in [0, 1,] # SDMF restrictions
261
262     def _fatal_error(self, f):
263         self.log("error during loop", failure=f, level=log.SCARY)
264         self._done(f)
265
266     def _update_status(self):
267         self._status.set_status("Sending Shares: %d placed out of %d, "
268                                 "%d messages outstanding" %
269                                 (len(self.placed),
270                                  len(self.goal),
271                                  len(self.outstanding)))
272         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
273
274     def loop(self, ignored=None):
275         self.log("entering loop", level=log.NOISY)
276         if not self._running:
277             return
278
279         self.looplimit -= 1
280         if self.looplimit <= 0:
281             raise RuntimeError("loop limit exceeded")
282
283         if self.surprised:
284             # don't send out any new shares, just wait for the outstanding
285             # ones to be retired.
286             self.log("currently surprised, so don't send any new shares",
287                      level=log.NOISY)
288         else:
289             self.update_goal()
290             # how far are we from our goal?
291             needed = self.goal - self.placed - self.outstanding
292             self._update_status()
293
294             if needed:
295                 # we need to send out new shares
296                 self.log(format="need to send %(needed)d new shares",
297                          needed=len(needed), level=log.NOISY)
298                 self._send_shares(needed)
299                 return
300
301         if self.outstanding:
302             # queries are still pending, keep waiting
303             self.log(format="%(outstanding)d queries still outstanding",
304                      outstanding=len(self.outstanding),
305                      level=log.NOISY)
306             return
307
308         # no queries outstanding, no placements needed: we're done
309         self.log("no queries outstanding, no placements needed: done",
310                  level=log.OPERATIONAL)
311         now = time.time()
312         elapsed = now - self._started_pushing
313         self._status.timings["push"] = elapsed
314         return self._done(None)
315
316     def log_goal(self, goal, message=""):
317         logmsg = [message]
318         for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
319             logmsg.append("sh%d to [%s]" % (shnum,
320                                             idlib.shortnodeid_b2a(peerid)))
321         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
322         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
323                  level=log.NOISY)
324
325     def update_goal(self):
326         # if log.recording_noisy
327         if True:
328             self.log_goal(self.goal, "before update: ")
329
330         # first, remove any bad peers from our goal
331         self.goal = set([ (peerid, shnum)
332                           for (peerid, shnum) in self.goal
333                           if peerid not in self.bad_peers ])
334
335         # find the homeless shares:
336         homefull_shares = set([shnum for (peerid, shnum) in self.goal])
337         homeless_shares = set(range(self.total_shares)) - homefull_shares
338         homeless_shares = sorted(list(homeless_shares))
339         # place them somewhere. We prefer unused servers at the beginning of
340         # the available peer list.
341
342         if not homeless_shares:
343             return
344
345         # if an old share X is on a node, put the new share X there too.
346         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
347         #       shares from existing peers to new (less-crowded) ones. The
348         #       old shares must still be updated.
349         # TODO: 2: move those shares instead of copying them, to reduce future
350         #       update work
351
352         # this is a bit CPU intensive but easy to analyze. We create a sort
353         # order for each peerid. If the peerid is marked as bad, we don't
354         # even put them in the list. Then we care about the number of shares
355         # which have already been assigned to them. After that we care about
356         # their permutation order.
357         old_assignments = DictOfSets()
358         for (peerid, shnum) in self.goal:
359             old_assignments.add(peerid, shnum)
360
361         peerlist = []
362         for i, (peerid, ss) in enumerate(self.full_peerlist):
363             if peerid in self.bad_peers:
364                 continue
365             entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
366             peerlist.append(entry)
367         peerlist.sort()
368
369         if not peerlist:
370             raise NotEnoughServersError("Ran out of non-bad servers")
371
372         new_assignments = []
373         # we then index this peerlist with an integer, because we may have to
374         # wrap. We update the goal as we go.
375         i = 0
376         for shnum in homeless_shares:
377             (ignored1, ignored2, peerid, ss) = peerlist[i]
378             # TODO: if we are forced to send a share to a server that already
379             # has one, we may have two write requests in flight, and the
380             # servermap (which was computed before either request was sent)
381             # won't reflect the new shares, so the second response will cause
382             # us to be surprised ("unexpected share on peer"), causing the
383             # publish to fail with an UncoordinatedWriteError. This is
384             # troublesome but not really a bit problem. Fix it at some point.
385             self.goal.add( (peerid, shnum) )
386             self.connections[peerid] = ss
387             i += 1
388             if i >= len(peerlist):
389                 i = 0
390         if True:
391             self.log_goal(self.goal, "after update: ")
392
393
394
395     def _encrypt_and_encode(self):
396         # this returns a Deferred that fires with a list of (sharedata,
397         # sharenum) tuples. TODO: cache the ciphertext, only produce the
398         # shares that we care about.
399         self.log("_encrypt_and_encode")
400
401         self._status.set_status("Encrypting")
402         started = time.time()
403
404         key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
405         enc = AES(key)
406         crypttext = enc.process(self.newdata)
407         assert len(crypttext) == len(self.newdata)
408
409         now = time.time()
410         self._status.timings["encrypt"] = now - started
411         started = now
412
413         # now apply FEC
414
415         self._status.set_status("Encoding")
416         fec = codec.CRSEncoder()
417         fec.set_params(self.segment_size,
418                        self.required_shares, self.total_shares)
419         piece_size = fec.get_block_size()
420         crypttext_pieces = [None] * self.required_shares
421         for i in range(len(crypttext_pieces)):
422             offset = i * piece_size
423             piece = crypttext[offset:offset+piece_size]
424             piece = piece + "\x00"*(piece_size - len(piece)) # padding
425             crypttext_pieces[i] = piece
426             assert len(piece) == piece_size
427
428         d = fec.encode(crypttext_pieces)
429         def _done_encoding(res):
430             elapsed = time.time() - started
431             self._status.timings["encode"] = elapsed
432             return res
433         d.addCallback(_done_encoding)
434         return d
435
436     def _generate_shares(self, shares_and_shareids):
437         # this sets self.shares and self.root_hash
438         self.log("_generate_shares")
439         self._status.set_status("Generating Shares")
440         started = time.time()
441
442         # we should know these by now
443         privkey = self._privkey
444         encprivkey = self._encprivkey
445         pubkey = self._pubkey
446
447         (shares, share_ids) = shares_and_shareids
448
449         assert len(shares) == len(share_ids)
450         assert len(shares) == self.total_shares
451         all_shares = {}
452         block_hash_trees = {}
453         share_hash_leaves = [None] * len(shares)
454         for i in range(len(shares)):
455             share_data = shares[i]
456             shnum = share_ids[i]
457             all_shares[shnum] = share_data
458
459             # build the block hash tree. SDMF has only one leaf.
460             leaves = [hashutil.block_hash(share_data)]
461             t = hashtree.HashTree(leaves)
462             block_hash_trees[shnum] = block_hash_tree = list(t)
463             share_hash_leaves[shnum] = t[0]
464         for leaf in share_hash_leaves:
465             assert leaf is not None
466         share_hash_tree = hashtree.HashTree(share_hash_leaves)
467         share_hash_chain = {}
468         for shnum in range(self.total_shares):
469             needed_hashes = share_hash_tree.needed_hashes(shnum)
470             share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
471                                               for i in needed_hashes ] )
472         root_hash = share_hash_tree[0]
473         assert len(root_hash) == 32
474         self.log("my new root_hash is %s" % base32.b2a(root_hash))
475
476         prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
477                              self.required_shares, self.total_shares,
478                              self.segment_size, len(self.newdata))
479
480         # now pack the beginning of the share. All shares are the same up
481         # to the signature, then they have divergent share hash chains,
482         # then completely different block hash trees + salt + share data,
483         # then they all share the same encprivkey at the end. The sizes
484         # of everything are the same for all shares.
485
486         sign_started = time.time()
487         signature = privkey.sign(prefix)
488         self._status.timings["sign"] = time.time() - sign_started
489
490         verification_key = pubkey.serialize()
491
492         final_shares = {}
493         for shnum in range(self.total_shares):
494             final_share = pack_share(prefix,
495                                      verification_key,
496                                      signature,
497                                      share_hash_chain[shnum],
498                                      block_hash_trees[shnum],
499                                      all_shares[shnum],
500                                      encprivkey)
501             final_shares[shnum] = final_share
502         elapsed = time.time() - started
503         self._status.timings["pack"] = elapsed
504         self.shares = final_shares
505         self.root_hash = root_hash
506
507         # we also need to build up the version identifier for what we're
508         # pushing. Extract the offsets from one of our shares.
509         assert final_shares
510         offsets = unpack_header(final_shares.values()[0])[-1]
511         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
512         verinfo = (self._new_seqnum, root_hash, self.salt,
513                    self.segment_size, len(self.newdata),
514                    self.required_shares, self.total_shares,
515                    prefix, offsets_tuple)
516         self.versioninfo = verinfo
517
518
519
520     def _send_shares(self, needed):
521         self.log("_send_shares")
522
523         # we're finally ready to send out our shares. If we encounter any
524         # surprises here, it's because somebody else is writing at the same
525         # time. (Note: in the future, when we remove the _query_peers() step
526         # and instead speculate about [or remember] which shares are where,
527         # surprises here are *not* indications of UncoordinatedWriteError,
528         # and we'll need to respond to them more gracefully.)
529
530         # needed is a set of (peerid, shnum) tuples. The first thing we do is
531         # organize it by peerid.
532
533         peermap = DictOfSets()
534         for (peerid, shnum) in needed:
535             peermap.add(peerid, shnum)
536
537         # the next thing is to build up a bunch of test vectors. The
538         # semantics of Publish are that we perform the operation if the world
539         # hasn't changed since the ServerMap was constructed (more or less).
540         # For every share we're trying to place, we create a test vector that
541         # tests to see if the server*share still corresponds to the
542         # map.
543
544         all_tw_vectors = {} # maps peerid to tw_vectors
545         sm = self._servermap.servermap
546
547         for key in needed:
548             (peerid, shnum) = key
549
550             if key in sm:
551                 # an old version of that share already exists on the
552                 # server, according to our servermap. We will create a
553                 # request that attempts to replace it.
554                 old_versionid, old_timestamp = sm[key]
555                 (old_seqnum, old_root_hash, old_salt, old_segsize,
556                  old_datalength, old_k, old_N, old_prefix,
557                  old_offsets_tuple) = old_versionid
558                 old_checkstring = pack_checkstring(old_seqnum,
559                                                    old_root_hash,
560                                                    old_salt)
561                 testv = (0, len(old_checkstring), "eq", old_checkstring)
562
563             else:
564                 # add a testv that requires the share not exist
565                 #testv = (0, 1, 'eq', "")
566
567                 # Unfortunately, foolscap-0.2.5 has a bug in the way inbound
568                 # constraints are handled. If the same object is referenced
569                 # multiple times inside the arguments, foolscap emits a
570                 # 'reference' token instead of a distinct copy of the
571                 # argument. The bug is that these 'reference' tokens are not
572                 # accepted by the inbound constraint code. To work around
573                 # this, we need to prevent python from interning the
574                 # (constant) tuple, by creating a new copy of this vector
575                 # each time. This bug is fixed in later versions of foolscap.
576                 testv = tuple([0, 1, 'eq', ""])
577
578             testvs = [testv]
579             # the write vector is simply the share
580             writev = [(0, self.shares[shnum])]
581
582             if peerid not in all_tw_vectors:
583                 all_tw_vectors[peerid] = {}
584                 # maps shnum to (testvs, writevs, new_length)
585             assert shnum not in all_tw_vectors[peerid]
586
587             all_tw_vectors[peerid][shnum] = (testvs, writev, None)
588
589         # we read the checkstring back from each share, however we only use
590         # it to detect whether there was a new share that we didn't know
591         # about. The success or failure of the write will tell us whether
592         # there was a collision or not. If there is a collision, the first
593         # thing we'll do is update the servermap, which will find out what
594         # happened. We could conceivably reduce a roundtrip by using the
595         # readv checkstring to populate the servermap, but really we'd have
596         # to read enough data to validate the signatures too, so it wouldn't
597         # be an overall win.
598         read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
599
600         # ok, send the messages!
601         self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
602         started = time.time()
603         for (peerid, tw_vectors) in all_tw_vectors.items():
604
605             write_enabler = self._node.get_write_enabler(peerid)
606             renew_secret = self._node.get_renewal_secret(peerid)
607             cancel_secret = self._node.get_cancel_secret(peerid)
608             secrets = (write_enabler, renew_secret, cancel_secret)
609             shnums = tw_vectors.keys()
610
611             for shnum in shnums:
612                 self.outstanding.add( (peerid, shnum) )
613
614             d = self._do_testreadwrite(peerid, secrets,
615                                        tw_vectors, read_vector)
616             d.addCallbacks(self._got_write_answer, self._got_write_error,
617                            callbackArgs=(peerid, shnums, started),
618                            errbackArgs=(peerid, shnums, started))
619             d.addCallback(self.loop)
620             d.addErrback(self._fatal_error)
621
622         self._update_status()
623         self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
624
625     def _do_testreadwrite(self, peerid, secrets,
626                           tw_vectors, read_vector):
627         storage_index = self._storage_index
628         ss = self.connections[peerid]
629
630         #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
631         d = ss.callRemote("slot_testv_and_readv_and_writev",
632                           storage_index,
633                           secrets,
634                           tw_vectors,
635                           read_vector)
636         return d
637
638     def _got_write_answer(self, answer, peerid, shnums, started):
639         lp = self.log("_got_write_answer from %s" %
640                       idlib.shortnodeid_b2a(peerid))
641         for shnum in shnums:
642             self.outstanding.discard( (peerid, shnum) )
643
644         now = time.time()
645         elapsed = now - started
646         self._status.add_per_server_time(peerid, elapsed)
647
648         wrote, read_data = answer
649
650         surprise_shares = set(read_data.keys()) - set(shnums)
651         if surprise_shares:
652             self.log("they had shares %s that we didn't know about" %
653                      (list(surprise_shares),),
654                      parent=lp, level=log.WEIRD)
655             self.surprised = True
656
657         if not wrote:
658             # TODO: there are two possibilities. The first is that the server
659             # is full (or just doesn't want to give us any room), which means
660             # we shouldn't ask them again, but is *not* an indication of an
661             # uncoordinated write. The second is that our testv failed, which
662             # *does* indicate an uncoordinated write. We currently don't have
663             # a way to tell these two apart (in fact, the storage server code
664             # doesn't have the option of refusing our share).
665             #
666             # If the server is full, mark the peer as bad (so we don't ask
667             # them again), but don't set self.surprised. The loop() will find
668             # a new server.
669             #
670             # If the testv failed, log it, set self.surprised, but don't
671             # bother adding to self.bad_peers .
672
673             self.log("our testv failed, so the write did not happen",
674                      parent=lp, level=log.WEIRD)
675             self.surprised = True
676             self.bad_peers.add(peerid) # don't ask them again
677             # use the checkstring to add information to the log message
678             for (shnum,readv) in read_data.items():
679                 checkstring = readv[0]
680                 (other_seqnum,
681                  other_roothash,
682                  other_salt) = unpack_checkstring(checkstring)
683                 expected_version = self._servermap.version_on_peer(peerid,
684                                                                    shnum)
685                 if expected_version:
686                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
687                      offsets_tuple) = expected_version
688                     self.log("somebody modified the share on us:"
689                              " shnum=%d: I thought they had #%d:R=%s,"
690                              " but testv reported #%d:R=%s" %
691                              (shnum,
692                               seqnum, base32.b2a(root_hash)[:4],
693                               other_seqnum, base32.b2a(other_roothash)[:4]),
694                              parent=lp, level=log.NOISY)
695                 # if expected_version==None, then we didn't expect to see a
696                 # share on that peer, and the 'surprise_shares' clause above
697                 # will have logged it.
698             # self.loop() will take care of finding new homes
699             return
700
701         for shnum in shnums:
702             self.placed.add( (peerid, shnum) )
703             # and update the servermap
704             self._servermap.add_new_share(peerid, shnum,
705                                           self.versioninfo, started)
706
707         # self.loop() will take care of checking to see if we're done
708         return
709
710     def _got_write_error(self, f, peerid, shnums, started):
711         for shnum in shnums:
712             self.outstanding.discard( (peerid, shnum) )
713         self.bad_peers.add(peerid)
714         self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
715                  shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
716                  failure=f,
717                  level=log.UNUSUAL)
718         # self.loop() will take care of checking to see if we're done
719         return
720
721
722     def _done(self, res):
723         if not self._running:
724             return
725         self._running = False
726         now = time.time()
727         self._status.timings["total"] = now - self._started
728         self._status.set_active(False)
729         if isinstance(res, failure.Failure):
730             self.log("Publish done, with failure", failure=res, level=log.WEIRD)
731             self._status.set_status("Failed")
732         elif self.surprised:
733             self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
734             self._status.set_status("UncoordinatedWriteError")
735             # deliver a failure
736             res = failure.Failure(UncoordinatedWriteError())
737             # TODO: recovery
738         else:
739             self.log("Publish done, success")
740             self._status.set_status("Done")
741             self._status.set_progress(1.0)
742         eventually(self.done_deferred.callback, res)
743
744