]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
mutable: make mutable-repair work for non-verifier runs, add tests
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, struct, time
4 from itertools import count
5 from zope.interface import implements
6 from twisted.internet import defer
7 from twisted.python import failure
8 from allmydata.interfaces import IPublishStatus, FileTooLargeError
9 from allmydata.util import base32, hashutil, mathutil, idlib, log
10 from allmydata import hashtree, codec, storage
11 from pycryptopp.cipher.aes import AES
12 from foolscap.eventual import eventually
13
14 from common import MODE_WRITE, MODE_CHECK, DictOfSets, \
15      UncoordinatedWriteError, NotEnoughServersError
16 from servermap import ServerMap
17 from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
18      unpack_checkstring, SIGNED_PREFIX
19
20 class PublishStatus:
21     implements(IPublishStatus)
22     statusid_counter = count(0)
23     def __init__(self):
24         self.timings = {}
25         self.timings["send_per_server"] = {}
26         self.servermap = None
27         self.problems = {}
28         self.active = True
29         self.storage_index = None
30         self.helper = False
31         self.encoding = ("?", "?")
32         self.size = None
33         self.status = "Not started"
34         self.progress = 0.0
35         self.counter = self.statusid_counter.next()
36         self.started = time.time()
37
38     def add_per_server_time(self, peerid, elapsed):
39         if peerid not in self.timings["send_per_server"]:
40             self.timings["send_per_server"][peerid] = []
41         self.timings["send_per_server"][peerid].append(elapsed)
42
43     def get_started(self):
44         return self.started
45     def get_storage_index(self):
46         return self.storage_index
47     def get_encoding(self):
48         return self.encoding
49     def using_helper(self):
50         return self.helper
51     def get_servermap(self):
52         return self.servermap
53     def get_size(self):
54         return self.size
55     def get_status(self):
56         return self.status
57     def get_progress(self):
58         return self.progress
59     def get_active(self):
60         return self.active
61     def get_counter(self):
62         return self.counter
63
64     def set_storage_index(self, si):
65         self.storage_index = si
66     def set_helper(self, helper):
67         self.helper = helper
68     def set_servermap(self, servermap):
69         self.servermap = servermap
70     def set_encoding(self, k, n):
71         self.encoding = (k, n)
72     def set_size(self, size):
73         self.size = size
74     def set_status(self, status):
75         self.status = status
76     def set_progress(self, value):
77         self.progress = value
78     def set_active(self, value):
79         self.active = value
80
81 class Publish:
82     """I represent a single act of publishing the mutable file to the grid. I
83     will only publish my data if the servermap I am using still represents
84     the current state of the world.
85
86     To make the initial publish, set servermap to None.
87     """
88
89     # we limit the segment size as usual to constrain our memory footprint.
90     # The max segsize is higher for mutable files, because we want to support
91     # dirnodes with up to 10k children, and each child uses about 330 bytes.
92     # If you actually put that much into a directory you'll be using a
93     # footprint of around 14MB, which is higher than we'd like, but it is
94     # more important right now to support large directories than to make
95     # memory usage small when you use them. Once we implement MDMF (with
96     # multiple segments), we will drop this back down, probably to 128KiB.
97     MAX_SEGMENT_SIZE = 3500000
98
99     def __init__(self, filenode, servermap):
100         self._node = filenode
101         self._servermap = servermap
102         self._storage_index = self._node.get_storage_index()
103         self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
104         num = self._node._client.log("Publish(%s): starting" % prefix)
105         self._log_number = num
106         self._running = True
107
108         self._status = PublishStatus()
109         self._status.set_storage_index(self._storage_index)
110         self._status.set_helper(False)
111         self._status.set_progress(0.0)
112         self._status.set_active(True)
113
114     def get_status(self):
115         return self._status
116
117     def log(self, *args, **kwargs):
118         if 'parent' not in kwargs:
119             kwargs['parent'] = self._log_number
120         if "facility" not in kwargs:
121             kwargs["facility"] = "tahoe.mutable.publish"
122         return log.msg(*args, **kwargs)
123
124     def publish(self, newdata):
125         """Publish the filenode's current contents.  Returns a Deferred that
126         fires (with None) when the publish has done as much work as it's ever
127         going to do, or errbacks with ConsistencyError if it detects a
128         simultaneous write.
129         """
130
131         # 1: generate shares (SDMF: files are small, so we can do it in RAM)
132         # 2: perform peer selection, get candidate servers
133         #  2a: send queries to n+epsilon servers, to determine current shares
134         #  2b: based upon responses, create target map
135         # 3: send slot_testv_and_readv_and_writev messages
136         # 4: as responses return, update share-dispatch table
137         # 4a: may need to run recovery algorithm
138         # 5: when enough responses are back, we're done
139
140         self.log("starting publish, datalen is %s" % len(newdata))
141         if len(newdata) > self.MAX_SEGMENT_SIZE:
142             raise FileTooLargeError("SDMF is limited to one segment, and "
143                                     "%d > %d" % (len(newdata),
144                                                  self.MAX_SEGMENT_SIZE))
145         self._status.set_size(len(newdata))
146         self._status.set_status("Started")
147         self._started = time.time()
148
149         self.done_deferred = defer.Deferred()
150
151         self._writekey = self._node.get_writekey()
152         assert self._writekey, "need write capability to publish"
153
154         # first, which servers will we publish to? We require that the
155         # servermap was updated in MODE_WRITE, so we can depend upon the
156         # peerlist computed by that process instead of computing our own.
157         if self._servermap:
158             assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
159             # we will push a version that is one larger than anything present
160             # in the grid, according to the servermap.
161             self._new_seqnum = self._servermap.highest_seqnum() + 1
162         else:
163             # If we don't have a servermap, that's because we're doing the
164             # initial publish
165             self._new_seqnum = 1
166             self._servermap = ServerMap()
167         self._status.set_servermap(self._servermap)
168
169         self.log(format="new seqnum will be %(seqnum)d",
170                  seqnum=self._new_seqnum, level=log.NOISY)
171
172         # having an up-to-date servermap (or using a filenode that was just
173         # created for the first time) also guarantees that the following
174         # fields are available
175         self.readkey = self._node.get_readkey()
176         self.required_shares = self._node.get_required_shares()
177         assert self.required_shares is not None
178         self.total_shares = self._node.get_total_shares()
179         assert self.total_shares is not None
180         self._status.set_encoding(self.required_shares, self.total_shares)
181
182         self._pubkey = self._node.get_pubkey()
183         assert self._pubkey
184         self._privkey = self._node.get_privkey()
185         assert self._privkey
186         self._encprivkey = self._node.get_encprivkey()
187
188         client = self._node._client
189         full_peerlist = client.get_permuted_peers("storage",
190                                                   self._storage_index)
191         self.full_peerlist = full_peerlist # for use later, immutable
192         self.bad_peers = set() # peerids who have errbacked/refused requests
193
194         self.newdata = newdata
195         self.salt = os.urandom(16)
196
197         self.setup_encoding_parameters()
198
199         # if we experience any surprises (writes which were rejected because
200         # our test vector did not match, or shares which we didn't expect to
201         # see), we set this flag and report an UncoordinatedWriteError at the
202         # end of the publish process.
203         self.surprised = False
204
205         # as a failsafe, refuse to iterate through self.loop more than a
206         # thousand times.
207         self.looplimit = 1000
208
209         # we keep track of three tables. The first is our goal: which share
210         # we want to see on which servers. This is initially populated by the
211         # existing servermap.
212         self.goal = set() # pairs of (peerid, shnum) tuples
213
214         # the second table is our list of outstanding queries: those which
215         # are in flight and may or may not be delivered, accepted, or
216         # acknowledged. Items are added to this table when the request is
217         # sent, and removed when the response returns (or errbacks).
218         self.outstanding = set() # (peerid, shnum) tuples
219
220         # the third is a table of successes: share which have actually been
221         # placed. These are populated when responses come back with success.
222         # When self.placed == self.goal, we're done.
223         self.placed = set() # (peerid, shnum) tuples
224
225         # we also keep a mapping from peerid to RemoteReference. Each time we
226         # pull a connection out of the full peerlist, we add it to this for
227         # use later.
228         self.connections = {}
229
230         self.bad_share_checkstrings = {}
231
232         # we use the servermap to populate the initial goal: this way we will
233         # try to update each existing share in place.
234         for (peerid, shnum) in self._servermap.servermap:
235             self.goal.add( (peerid, shnum) )
236             self.connections[peerid] = self._servermap.connections[peerid]
237         # then we add in all the shares that were bad (corrupted, bad
238         # signatures, etc). We want to replace these.
239         for key, old_checkstring in self._servermap.bad_shares.items():
240             (peerid, shnum) = key
241             self.goal.add(key)
242             self.bad_share_checkstrings[key] = old_checkstring
243             self.connections[peerid] = self._servermap.connections[peerid]
244
245         # create the shares. We'll discard these as they are delivered. SDMF:
246         # we're allowed to hold everything in memory.
247
248         self._status.timings["setup"] = time.time() - self._started
249         d = self._encrypt_and_encode()
250         d.addCallback(self._generate_shares)
251         def _start_pushing(res):
252             self._started_pushing = time.time()
253             return res
254         d.addCallback(_start_pushing)
255         d.addCallback(self.loop) # trigger delivery
256         d.addErrback(self._fatal_error)
257
258         return self.done_deferred
259
260     def setup_encoding_parameters(self):
261         segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
262         # this must be a multiple of self.required_shares
263         segment_size = mathutil.next_multiple(segment_size,
264                                               self.required_shares)
265         self.segment_size = segment_size
266         if segment_size:
267             self.num_segments = mathutil.div_ceil(len(self.newdata),
268                                                   segment_size)
269         else:
270             self.num_segments = 0
271         assert self.num_segments in [0, 1,] # SDMF restrictions
272
273     def _fatal_error(self, f):
274         self.log("error during loop", failure=f, level=log.UNUSUAL)
275         self._done(f)
276
277     def _update_status(self):
278         self._status.set_status("Sending Shares: %d placed out of %d, "
279                                 "%d messages outstanding" %
280                                 (len(self.placed),
281                                  len(self.goal),
282                                  len(self.outstanding)))
283         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
284
285     def loop(self, ignored=None):
286         self.log("entering loop", level=log.NOISY)
287         if not self._running:
288             return
289
290         self.looplimit -= 1
291         if self.looplimit <= 0:
292             raise RuntimeError("loop limit exceeded")
293
294         if self.surprised:
295             # don't send out any new shares, just wait for the outstanding
296             # ones to be retired.
297             self.log("currently surprised, so don't send any new shares",
298                      level=log.NOISY)
299         else:
300             self.update_goal()
301             # how far are we from our goal?
302             needed = self.goal - self.placed - self.outstanding
303             self._update_status()
304
305             if needed:
306                 # we need to send out new shares
307                 self.log(format="need to send %(needed)d new shares",
308                          needed=len(needed), level=log.NOISY)
309                 self._send_shares(needed)
310                 return
311
312         if self.outstanding:
313             # queries are still pending, keep waiting
314             self.log(format="%(outstanding)d queries still outstanding",
315                      outstanding=len(self.outstanding),
316                      level=log.NOISY)
317             return
318
319         # no queries outstanding, no placements needed: we're done
320         self.log("no queries outstanding, no placements needed: done",
321                  level=log.OPERATIONAL)
322         now = time.time()
323         elapsed = now - self._started_pushing
324         self._status.timings["push"] = elapsed
325         return self._done(None)
326
327     def log_goal(self, goal, message=""):
328         logmsg = [message]
329         for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
330             logmsg.append("sh%d to [%s]" % (shnum,
331                                             idlib.shortnodeid_b2a(peerid)))
332         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
333         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
334                  level=log.NOISY)
335
336     def update_goal(self):
337         # if log.recording_noisy
338         if True:
339             self.log_goal(self.goal, "before update: ")
340
341         # first, remove any bad peers from our goal
342         self.goal = set([ (peerid, shnum)
343                           for (peerid, shnum) in self.goal
344                           if peerid not in self.bad_peers ])
345
346         # find the homeless shares:
347         homefull_shares = set([shnum for (peerid, shnum) in self.goal])
348         homeless_shares = set(range(self.total_shares)) - homefull_shares
349         homeless_shares = sorted(list(homeless_shares))
350         # place them somewhere. We prefer unused servers at the beginning of
351         # the available peer list.
352
353         if not homeless_shares:
354             return
355
356         # if an old share X is on a node, put the new share X there too.
357         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
358         #       shares from existing peers to new (less-crowded) ones. The
359         #       old shares must still be updated.
360         # TODO: 2: move those shares instead of copying them, to reduce future
361         #       update work
362
363         # this is a bit CPU intensive but easy to analyze. We create a sort
364         # order for each peerid. If the peerid is marked as bad, we don't
365         # even put them in the list. Then we care about the number of shares
366         # which have already been assigned to them. After that we care about
367         # their permutation order.
368         old_assignments = DictOfSets()
369         for (peerid, shnum) in self.goal:
370             old_assignments.add(peerid, shnum)
371
372         peerlist = []
373         for i, (peerid, ss) in enumerate(self.full_peerlist):
374             if peerid in self.bad_peers:
375                 continue
376             entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
377             peerlist.append(entry)
378         peerlist.sort()
379
380         if not peerlist:
381             raise NotEnoughServersError("Ran out of non-bad servers")
382
383         new_assignments = []
384         # we then index this peerlist with an integer, because we may have to
385         # wrap. We update the goal as we go.
386         i = 0
387         for shnum in homeless_shares:
388             (ignored1, ignored2, peerid, ss) = peerlist[i]
389             # TODO: if we are forced to send a share to a server that already
390             # has one, we may have two write requests in flight, and the
391             # servermap (which was computed before either request was sent)
392             # won't reflect the new shares, so the second response will cause
393             # us to be surprised ("unexpected share on peer"), causing the
394             # publish to fail with an UncoordinatedWriteError. This is
395             # troublesome but not really a bit problem. Fix it at some point.
396             self.goal.add( (peerid, shnum) )
397             self.connections[peerid] = ss
398             i += 1
399             if i >= len(peerlist):
400                 i = 0
401         if True:
402             self.log_goal(self.goal, "after update: ")
403
404
405
406     def _encrypt_and_encode(self):
407         # this returns a Deferred that fires with a list of (sharedata,
408         # sharenum) tuples. TODO: cache the ciphertext, only produce the
409         # shares that we care about.
410         self.log("_encrypt_and_encode")
411
412         self._status.set_status("Encrypting")
413         started = time.time()
414
415         key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
416         enc = AES(key)
417         crypttext = enc.process(self.newdata)
418         assert len(crypttext) == len(self.newdata)
419
420         now = time.time()
421         self._status.timings["encrypt"] = now - started
422         started = now
423
424         # now apply FEC
425
426         self._status.set_status("Encoding")
427         fec = codec.CRSEncoder()
428         fec.set_params(self.segment_size,
429                        self.required_shares, self.total_shares)
430         piece_size = fec.get_block_size()
431         crypttext_pieces = [None] * self.required_shares
432         for i in range(len(crypttext_pieces)):
433             offset = i * piece_size
434             piece = crypttext[offset:offset+piece_size]
435             piece = piece + "\x00"*(piece_size - len(piece)) # padding
436             crypttext_pieces[i] = piece
437             assert len(piece) == piece_size
438
439         d = fec.encode(crypttext_pieces)
440         def _done_encoding(res):
441             elapsed = time.time() - started
442             self._status.timings["encode"] = elapsed
443             return res
444         d.addCallback(_done_encoding)
445         return d
446
447     def _generate_shares(self, shares_and_shareids):
448         # this sets self.shares and self.root_hash
449         self.log("_generate_shares")
450         self._status.set_status("Generating Shares")
451         started = time.time()
452
453         # we should know these by now
454         privkey = self._privkey
455         encprivkey = self._encprivkey
456         pubkey = self._pubkey
457
458         (shares, share_ids) = shares_and_shareids
459
460         assert len(shares) == len(share_ids)
461         assert len(shares) == self.total_shares
462         all_shares = {}
463         block_hash_trees = {}
464         share_hash_leaves = [None] * len(shares)
465         for i in range(len(shares)):
466             share_data = shares[i]
467             shnum = share_ids[i]
468             all_shares[shnum] = share_data
469
470             # build the block hash tree. SDMF has only one leaf.
471             leaves = [hashutil.block_hash(share_data)]
472             t = hashtree.HashTree(leaves)
473             block_hash_trees[shnum] = block_hash_tree = list(t)
474             share_hash_leaves[shnum] = t[0]
475         for leaf in share_hash_leaves:
476             assert leaf is not None
477         share_hash_tree = hashtree.HashTree(share_hash_leaves)
478         share_hash_chain = {}
479         for shnum in range(self.total_shares):
480             needed_hashes = share_hash_tree.needed_hashes(shnum)
481             share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
482                                               for i in needed_hashes ] )
483         root_hash = share_hash_tree[0]
484         assert len(root_hash) == 32
485         self.log("my new root_hash is %s" % base32.b2a(root_hash))
486
487         prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
488                              self.required_shares, self.total_shares,
489                              self.segment_size, len(self.newdata))
490
491         # now pack the beginning of the share. All shares are the same up
492         # to the signature, then they have divergent share hash chains,
493         # then completely different block hash trees + salt + share data,
494         # then they all share the same encprivkey at the end. The sizes
495         # of everything are the same for all shares.
496
497         sign_started = time.time()
498         signature = privkey.sign(prefix)
499         self._status.timings["sign"] = time.time() - sign_started
500
501         verification_key = pubkey.serialize()
502
503         final_shares = {}
504         for shnum in range(self.total_shares):
505             final_share = pack_share(prefix,
506                                      verification_key,
507                                      signature,
508                                      share_hash_chain[shnum],
509                                      block_hash_trees[shnum],
510                                      all_shares[shnum],
511                                      encprivkey)
512             final_shares[shnum] = final_share
513         elapsed = time.time() - started
514         self._status.timings["pack"] = elapsed
515         self.shares = final_shares
516         self.root_hash = root_hash
517
518         # we also need to build up the version identifier for what we're
519         # pushing. Extract the offsets from one of our shares.
520         assert final_shares
521         offsets = unpack_header(final_shares.values()[0])[-1]
522         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
523         verinfo = (self._new_seqnum, root_hash, self.salt,
524                    self.segment_size, len(self.newdata),
525                    self.required_shares, self.total_shares,
526                    prefix, offsets_tuple)
527         self.versioninfo = verinfo
528
529
530
531     def _send_shares(self, needed):
532         self.log("_send_shares")
533
534         # we're finally ready to send out our shares. If we encounter any
535         # surprises here, it's because somebody else is writing at the same
536         # time. (Note: in the future, when we remove the _query_peers() step
537         # and instead speculate about [or remember] which shares are where,
538         # surprises here are *not* indications of UncoordinatedWriteError,
539         # and we'll need to respond to them more gracefully.)
540
541         # needed is a set of (peerid, shnum) tuples. The first thing we do is
542         # organize it by peerid.
543
544         peermap = DictOfSets()
545         for (peerid, shnum) in needed:
546             peermap.add(peerid, shnum)
547
548         # the next thing is to build up a bunch of test vectors. The
549         # semantics of Publish are that we perform the operation if the world
550         # hasn't changed since the ServerMap was constructed (more or less).
551         # For every share we're trying to place, we create a test vector that
552         # tests to see if the server*share still corresponds to the
553         # map.
554
555         all_tw_vectors = {} # maps peerid to tw_vectors
556         sm = self._servermap.servermap
557
558         for key in needed:
559             (peerid, shnum) = key
560
561             if key in sm:
562                 # an old version of that share already exists on the
563                 # server, according to our servermap. We will create a
564                 # request that attempts to replace it.
565                 old_versionid, old_timestamp = sm[key]
566                 (old_seqnum, old_root_hash, old_salt, old_segsize,
567                  old_datalength, old_k, old_N, old_prefix,
568                  old_offsets_tuple) = old_versionid
569                 old_checkstring = pack_checkstring(old_seqnum,
570                                                    old_root_hash,
571                                                    old_salt)
572                 testv = (0, len(old_checkstring), "eq", old_checkstring)
573
574             elif key in self.bad_share_checkstrings:
575                 old_checkstring = self.bad_share_checkstrings[key]
576                 testv = (0, len(old_checkstring), "eq", old_checkstring)
577
578             else:
579                 # add a testv that requires the share not exist
580                 testv = (0, 1, 'eq', "")
581
582             testvs = [testv]
583             # the write vector is simply the share
584             writev = [(0, self.shares[shnum])]
585
586             if peerid not in all_tw_vectors:
587                 all_tw_vectors[peerid] = {}
588                 # maps shnum to (testvs, writevs, new_length)
589             assert shnum not in all_tw_vectors[peerid]
590
591             all_tw_vectors[peerid][shnum] = (testvs, writev, None)
592
593         # we read the checkstring back from each share, however we only use
594         # it to detect whether there was a new share that we didn't know
595         # about. The success or failure of the write will tell us whether
596         # there was a collision or not. If there is a collision, the first
597         # thing we'll do is update the servermap, which will find out what
598         # happened. We could conceivably reduce a roundtrip by using the
599         # readv checkstring to populate the servermap, but really we'd have
600         # to read enough data to validate the signatures too, so it wouldn't
601         # be an overall win.
602         read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
603
604         # ok, send the messages!
605         self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
606         started = time.time()
607         for (peerid, tw_vectors) in all_tw_vectors.items():
608
609             write_enabler = self._node.get_write_enabler(peerid)
610             renew_secret = self._node.get_renewal_secret(peerid)
611             cancel_secret = self._node.get_cancel_secret(peerid)
612             secrets = (write_enabler, renew_secret, cancel_secret)
613             shnums = tw_vectors.keys()
614
615             for shnum in shnums:
616                 self.outstanding.add( (peerid, shnum) )
617
618             d = self._do_testreadwrite(peerid, secrets,
619                                        tw_vectors, read_vector)
620             d.addCallbacks(self._got_write_answer, self._got_write_error,
621                            callbackArgs=(peerid, shnums, started),
622                            errbackArgs=(peerid, shnums, started))
623             d.addCallback(self.loop)
624             d.addErrback(self._fatal_error)
625
626         self._update_status()
627         self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
628
629     def _do_testreadwrite(self, peerid, secrets,
630                           tw_vectors, read_vector):
631         storage_index = self._storage_index
632         ss = self.connections[peerid]
633
634         #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
635         d = ss.callRemote("slot_testv_and_readv_and_writev",
636                           storage_index,
637                           secrets,
638                           tw_vectors,
639                           read_vector)
640         return d
641
642     def _got_write_answer(self, answer, peerid, shnums, started):
643         lp = self.log("_got_write_answer from %s" %
644                       idlib.shortnodeid_b2a(peerid))
645         for shnum in shnums:
646             self.outstanding.discard( (peerid, shnum) )
647
648         now = time.time()
649         elapsed = now - started
650         self._status.add_per_server_time(peerid, elapsed)
651
652         wrote, read_data = answer
653
654         surprise_shares = set(read_data.keys()) - set(shnums)
655         if surprise_shares:
656             self.log("they had shares %s that we didn't know about" %
657                      (list(surprise_shares),),
658                      parent=lp, level=log.WEIRD, umid="un9CSQ")
659             self.surprised = True
660
661         if not wrote:
662             # TODO: there are two possibilities. The first is that the server
663             # is full (or just doesn't want to give us any room), which means
664             # we shouldn't ask them again, but is *not* an indication of an
665             # uncoordinated write. The second is that our testv failed, which
666             # *does* indicate an uncoordinated write. We currently don't have
667             # a way to tell these two apart (in fact, the storage server code
668             # doesn't have the option of refusing our share).
669             #
670             # If the server is full, mark the peer as bad (so we don't ask
671             # them again), but don't set self.surprised. The loop() will find
672             # a new server.
673             #
674             # If the testv failed, log it, set self.surprised, but don't
675             # bother adding to self.bad_peers .
676
677             self.log("our testv failed, so the write did not happen",
678                      parent=lp, level=log.WEIRD, umid="8sc26g")
679             self.surprised = True
680             self.bad_peers.add(peerid) # don't ask them again
681             # use the checkstring to add information to the log message
682             for (shnum,readv) in read_data.items():
683                 checkstring = readv[0]
684                 (other_seqnum,
685                  other_roothash,
686                  other_salt) = unpack_checkstring(checkstring)
687                 expected_version = self._servermap.version_on_peer(peerid,
688                                                                    shnum)
689                 if expected_version:
690                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
691                      offsets_tuple) = expected_version
692                     self.log("somebody modified the share on us:"
693                              " shnum=%d: I thought they had #%d:R=%s,"
694                              " but testv reported #%d:R=%s" %
695                              (shnum,
696                               seqnum, base32.b2a(root_hash)[:4],
697                               other_seqnum, base32.b2a(other_roothash)[:4]),
698                              parent=lp, level=log.NOISY)
699                 # if expected_version==None, then we didn't expect to see a
700                 # share on that peer, and the 'surprise_shares' clause above
701                 # will have logged it.
702             # self.loop() will take care of finding new homes
703             return
704
705         for shnum in shnums:
706             self.placed.add( (peerid, shnum) )
707             # and update the servermap
708             self._servermap.add_new_share(peerid, shnum,
709                                           self.versioninfo, started)
710
711         # self.loop() will take care of checking to see if we're done
712         return
713
714     def _got_write_error(self, f, peerid, shnums, started):
715         for shnum in shnums:
716             self.outstanding.discard( (peerid, shnum) )
717         self.bad_peers.add(peerid)
718         self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
719                  shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
720                  failure=f,
721                  level=log.UNUSUAL)
722         # self.loop() will take care of checking to see if we're done
723         return
724
725
726     def _done(self, res):
727         if not self._running:
728             return
729         self._running = False
730         now = time.time()
731         self._status.timings["total"] = now - self._started
732         self._status.set_active(False)
733         if isinstance(res, failure.Failure):
734             self.log("Publish done, with failure", failure=res,
735                      level=log.WEIRD, umid="nRsR9Q")
736             self._status.set_status("Failed")
737         elif self.surprised:
738             self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
739             self._status.set_status("UncoordinatedWriteError")
740             # deliver a failure
741             res = failure.Failure(UncoordinatedWriteError())
742             # TODO: recovery
743         else:
744             self.log("Publish done, success")
745             self._status.set_status("Done")
746             self._status.set_progress(1.0)
747         eventually(self.done_deferred.callback, res)
748
749