]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
acbe5cee6bae655ebb8df54abb9eda8bc39202b1
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, struct, time
4 from itertools import count
5 from zope.interface import implements
6 from twisted.internet import defer
7 from twisted.python import failure
8 from allmydata.interfaces import IPublishStatus
9 from allmydata.util import base32, hashutil, mathutil, idlib, log
10 from allmydata import hashtree, codec, storage
11 from pycryptopp.cipher.aes import AES
12 from foolscap.eventual import eventually
13
14 from common import MODE_WRITE, UncoordinatedWriteError, DictOfSets
15 from servermap import ServerMap
16 from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
17      unpack_checkstring, SIGNED_PREFIX
18
19 class PublishStatus:
20     implements(IPublishStatus)
21     statusid_counter = count(0)
22     def __init__(self):
23         self.timings = {}
24         self.timings["send_per_server"] = {}
25         self.servermap = None
26         self.problems = {}
27         self.active = True
28         self.storage_index = None
29         self.helper = False
30         self.encoding = ("?", "?")
31         self.size = None
32         self.status = "Not started"
33         self.progress = 0.0
34         self.counter = self.statusid_counter.next()
35         self.started = time.time()
36
37     def add_per_server_time(self, peerid, elapsed):
38         if peerid not in self.timings["send_per_server"]:
39             self.timings["send_per_server"][peerid] = []
40         self.timings["send_per_server"][peerid].append(elapsed)
41
42     def get_started(self):
43         return self.started
44     def get_storage_index(self):
45         return self.storage_index
46     def get_encoding(self):
47         return self.encoding
48     def using_helper(self):
49         return self.helper
50     def get_servermap(self):
51         return self.servermap
52     def get_size(self):
53         return self.size
54     def get_status(self):
55         return self.status
56     def get_progress(self):
57         return self.progress
58     def get_active(self):
59         return self.active
60     def get_counter(self):
61         return self.counter
62
63     def set_storage_index(self, si):
64         self.storage_index = si
65     def set_helper(self, helper):
66         self.helper = helper
67     def set_servermap(self, servermap):
68         self.servermap = servermap
69     def set_encoding(self, k, n):
70         self.encoding = (k, n)
71     def set_size(self, size):
72         self.size = size
73     def set_status(self, status):
74         self.status = status
75     def set_progress(self, value):
76         self.progress = value
77     def set_active(self, value):
78         self.active = value
79
80 class Publish:
81     """I represent a single act of publishing the mutable file to the grid. I
82     will only publish my data if the servermap I am using still represents
83     the current state of the world.
84
85     To make the initial publish, set servermap to None.
86     """
87
88     # we limit the segment size as usual to constrain our memory footprint.
89     # The max segsize is higher for mutable files, because we want to support
90     # dirnodes with up to 10k children, and each child uses about 330 bytes.
91     # If you actually put that much into a directory you'll be using a
92     # footprint of around 14MB, which is higher than we'd like, but it is
93     # more important right now to support large directories than to make
94     # memory usage small when you use them. Once we implement MDMF (with
95     # multiple segments), we will drop this back down, probably to 128KiB.
96     MAX_SEGMENT_SIZE = 3500000
97
98     def __init__(self, filenode, servermap):
99         self._node = filenode
100         self._servermap = servermap
101         self._storage_index = self._node.get_storage_index()
102         self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
103         num = self._node._client.log("Publish(%s): starting" % prefix)
104         self._log_number = num
105         self._running = True
106
107         self._status = PublishStatus()
108         self._status.set_storage_index(self._storage_index)
109         self._status.set_helper(False)
110         self._status.set_progress(0.0)
111         self._status.set_active(True)
112         self._status.set_servermap(servermap)
113
114     def log(self, *args, **kwargs):
115         if 'parent' not in kwargs:
116             kwargs['parent'] = self._log_number
117         return log.msg(*args, **kwargs)
118
119     def log_err(self, *args, **kwargs):
120         if 'parent' not in kwargs:
121             kwargs['parent'] = self._log_number
122         return log.err(*args, **kwargs)
123
124     def publish(self, newdata):
125         """Publish the filenode's current contents.  Returns a Deferred that
126         fires (with None) when the publish has done as much work as it's ever
127         going to do, or errbacks with ConsistencyError if it detects a
128         simultaneous write.
129         """
130
131         # 1: generate shares (SDMF: files are small, so we can do it in RAM)
132         # 2: perform peer selection, get candidate servers
133         #  2a: send queries to n+epsilon servers, to determine current shares
134         #  2b: based upon responses, create target map
135         # 3: send slot_testv_and_readv_and_writev messages
136         # 4: as responses return, update share-dispatch table
137         # 4a: may need to run recovery algorithm
138         # 5: when enough responses are back, we're done
139
140         self.log("starting publish, datalen is %s" % len(newdata))
141         self._status.set_size(len(newdata))
142         self._status.set_status("Started")
143         self._started = time.time()
144
145         self.done_deferred = defer.Deferred()
146
147         self._writekey = self._node.get_writekey()
148         assert self._writekey, "need write capability to publish"
149
150         # first, which servers will we publish to? We require that the
151         # servermap was updated in MODE_WRITE, so we can depend upon the
152         # peerlist computed by that process instead of computing our own.
153         if self._servermap:
154             assert self._servermap.last_update_mode == MODE_WRITE
155             # we will push a version that is one larger than anything present
156             # in the grid, according to the servermap.
157             self._new_seqnum = self._servermap.highest_seqnum() + 1
158         else:
159             # If we don't have a servermap, that's because we're doing the
160             # initial publish
161             self._new_seqnum = 1
162             self._servermap = ServerMap()
163
164         self.log(format="new seqnum will be %(seqnum)d",
165                  seqnum=self._new_seqnum, level=log.NOISY)
166
167         # having an up-to-date servermap (or using a filenode that was just
168         # created for the first time) also guarantees that the following
169         # fields are available
170         self.readkey = self._node.get_readkey()
171         self.required_shares = self._node.get_required_shares()
172         assert self.required_shares is not None
173         self.total_shares = self._node.get_total_shares()
174         assert self.total_shares is not None
175         self._status.set_encoding(self.required_shares, self.total_shares)
176
177         self._pubkey = self._node.get_pubkey()
178         assert self._pubkey
179         self._privkey = self._node.get_privkey()
180         assert self._privkey
181         self._encprivkey = self._node.get_encprivkey()
182
183         client = self._node._client
184         full_peerlist = client.get_permuted_peers("storage",
185                                                   self._storage_index)
186         self.full_peerlist = full_peerlist # for use later, immutable
187         self.bad_peers = set() # peerids who have errbacked/refused requests
188
189         self.newdata = newdata
190         self.salt = os.urandom(16)
191
192         self.setup_encoding_parameters()
193
194         self.surprised = False
195
196         # we keep track of three tables. The first is our goal: which share
197         # we want to see on which servers. This is initially populated by the
198         # existing servermap.
199         self.goal = set() # pairs of (peerid, shnum) tuples
200
201         # the second table is our list of outstanding queries: those which
202         # are in flight and may or may not be delivered, accepted, or
203         # acknowledged. Items are added to this table when the request is
204         # sent, and removed when the response returns (or errbacks).
205         self.outstanding = set() # (peerid, shnum) tuples
206
207         # the third is a table of successes: share which have actually been
208         # placed. These are populated when responses come back with success.
209         # When self.placed == self.goal, we're done.
210         self.placed = set() # (peerid, shnum) tuples
211
212         # we also keep a mapping from peerid to RemoteReference. Each time we
213         # pull a connection out of the full peerlist, we add it to this for
214         # use later.
215         self.connections = {}
216
217         # we use the servermap to populate the initial goal: this way we will
218         # try to update each existing share in place.
219         for (peerid, shnum) in self._servermap.servermap:
220             self.goal.add( (peerid, shnum) )
221             self.connections[peerid] = self._servermap.connections[peerid]
222
223         # create the shares. We'll discard these as they are delivered. SMDF:
224         # we're allowed to hold everything in memory.
225
226         self._status.timings["setup"] = time.time() - self._started
227         d = self._encrypt_and_encode()
228         d.addCallback(self._generate_shares)
229         def _start_pushing(res):
230             self._started_pushing = time.time()
231             return res
232         d.addCallback(_start_pushing)
233         d.addCallback(self.loop) # trigger delivery
234         d.addErrback(self._fatal_error)
235
236         return self.done_deferred
237
238     def setup_encoding_parameters(self):
239         segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
240         # this must be a multiple of self.required_shares
241         segment_size = mathutil.next_multiple(segment_size,
242                                               self.required_shares)
243         self.segment_size = segment_size
244         if segment_size:
245             self.num_segments = mathutil.div_ceil(len(self.newdata),
246                                                   segment_size)
247         else:
248             self.num_segments = 0
249         assert self.num_segments in [0, 1,] # SDMF restrictions
250
251     def _fatal_error(self, f):
252         self.log("error during loop", failure=f, level=log.SCARY)
253         self._done(f)
254
255     def _update_status(self):
256         self._status.set_status("Sending Shares: %d placed out of %d, "
257                                 "%d messages outstanding" %
258                                 (len(self.placed),
259                                  len(self.goal),
260                                  len(self.outstanding)))
261         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
262
263     def loop(self, ignored=None):
264         self.log("entering loop", level=log.NOISY)
265         if not self._running:
266             return
267         self.update_goal()
268         # how far are we from our goal?
269         needed = self.goal - self.placed - self.outstanding
270         self._update_status()
271
272         if needed:
273             # we need to send out new shares
274             self.log(format="need to send %(needed)d new shares",
275                      needed=len(needed), level=log.NOISY)
276             d = self._send_shares(needed)
277             d.addCallback(self.loop)
278             d.addErrback(self._fatal_error)
279             return
280
281         if self.outstanding:
282             # queries are still pending, keep waiting
283             self.log(format="%(outstanding)d queries still outstanding",
284                      outstanding=len(self.outstanding),
285                      level=log.NOISY)
286             return
287
288         # no queries outstanding, no placements needed: we're done
289         self.log("no queries outstanding, no placements needed: done",
290                  level=log.OPERATIONAL)
291         now = time.time()
292         elapsed = now - self._started_pushing
293         self._status.timings["push"] = elapsed
294         return self._done(None)
295
296     def log_goal(self, goal):
297         logmsg = []
298         for (peerid, shnum) in goal:
299             logmsg.append("sh%d to [%s]" % (shnum,
300                                             idlib.shortnodeid_b2a(peerid)))
301         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
302         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
303                  level=log.NOISY)
304
305     def update_goal(self):
306         # first, remove any bad peers from our goal
307         self.goal = set([ (peerid, shnum)
308                           for (peerid, shnum) in self.goal
309                           if peerid not in self.bad_peers ])
310
311         # find the homeless shares:
312         homefull_shares = set([shnum for (peerid, shnum) in self.goal])
313         homeless_shares = set(range(self.total_shares)) - homefull_shares
314         homeless_shares = sorted(list(homeless_shares))
315         # place them somewhere. We prefer unused servers at the beginning of
316         # the available peer list.
317
318         if not homeless_shares:
319             return
320
321         # if log.recording_noisy
322         if False:
323             self.log_goal(self.goal)
324
325         # if an old share X is on a node, put the new share X there too.
326         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
327         #       shares from existing peers to new (less-crowded) ones. The
328         #       old shares must still be updated.
329         # TODO: 2: move those shares instead of copying them, to reduce future
330         #       update work
331
332         # this is a bit CPU intensive but easy to analyze. We create a sort
333         # order for each peerid. If the peerid is marked as bad, we don't
334         # even put them in the list. Then we care about the number of shares
335         # which have already been assigned to them. After that we care about
336         # their permutation order.
337         old_assignments = DictOfSets()
338         for (peerid, shnum) in self.goal:
339             old_assignments.add(peerid, shnum)
340
341         peerlist = []
342         for i, (peerid, ss) in enumerate(self.full_peerlist):
343             entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
344             peerlist.append(entry)
345         peerlist.sort()
346
347         new_assignments = []
348         # we then index this peerlist with an integer, because we may have to
349         # wrap. We update the goal as we go.
350         i = 0
351         for shnum in homeless_shares:
352             (ignored1, ignored2, peerid, ss) = peerlist[i]
353             self.goal.add( (peerid, shnum) )
354             self.connections[peerid] = ss
355             i += 1
356             if i >= len(peerlist):
357                 i = 0
358
359
360
361     def _encrypt_and_encode(self):
362         # this returns a Deferred that fires with a list of (sharedata,
363         # sharenum) tuples. TODO: cache the ciphertext, only produce the
364         # shares that we care about.
365         self.log("_encrypt_and_encode")
366
367         self._status.set_status("Encrypting")
368         started = time.time()
369
370         key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
371         enc = AES(key)
372         crypttext = enc.process(self.newdata)
373         assert len(crypttext) == len(self.newdata)
374
375         now = time.time()
376         self._status.timings["encrypt"] = now - started
377         started = now
378
379         # now apply FEC
380
381         self._status.set_status("Encoding")
382         fec = codec.CRSEncoder()
383         fec.set_params(self.segment_size,
384                        self.required_shares, self.total_shares)
385         piece_size = fec.get_block_size()
386         crypttext_pieces = [None] * self.required_shares
387         for i in range(len(crypttext_pieces)):
388             offset = i * piece_size
389             piece = crypttext[offset:offset+piece_size]
390             piece = piece + "\x00"*(piece_size - len(piece)) # padding
391             crypttext_pieces[i] = piece
392             assert len(piece) == piece_size
393
394         d = fec.encode(crypttext_pieces)
395         def _done_encoding(res):
396             elapsed = time.time() - started
397             self._status.timings["encode"] = elapsed
398             return res
399         d.addCallback(_done_encoding)
400         return d
401
402     def _generate_shares(self, shares_and_shareids):
403         # this sets self.shares and self.root_hash
404         self.log("_generate_shares")
405         self._status.set_status("Generating Shares")
406         started = time.time()
407
408         # we should know these by now
409         privkey = self._privkey
410         encprivkey = self._encprivkey
411         pubkey = self._pubkey
412
413         (shares, share_ids) = shares_and_shareids
414
415         assert len(shares) == len(share_ids)
416         assert len(shares) == self.total_shares
417         all_shares = {}
418         block_hash_trees = {}
419         share_hash_leaves = [None] * len(shares)
420         for i in range(len(shares)):
421             share_data = shares[i]
422             shnum = share_ids[i]
423             all_shares[shnum] = share_data
424
425             # build the block hash tree. SDMF has only one leaf.
426             leaves = [hashutil.block_hash(share_data)]
427             t = hashtree.HashTree(leaves)
428             block_hash_trees[shnum] = block_hash_tree = list(t)
429             share_hash_leaves[shnum] = t[0]
430         for leaf in share_hash_leaves:
431             assert leaf is not None
432         share_hash_tree = hashtree.HashTree(share_hash_leaves)
433         share_hash_chain = {}
434         for shnum in range(self.total_shares):
435             needed_hashes = share_hash_tree.needed_hashes(shnum)
436             share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
437                                               for i in needed_hashes ] )
438         root_hash = share_hash_tree[0]
439         assert len(root_hash) == 32
440         self.log("my new root_hash is %s" % base32.b2a(root_hash))
441
442         prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
443                              self.required_shares, self.total_shares,
444                              self.segment_size, len(self.newdata))
445
446         # now pack the beginning of the share. All shares are the same up
447         # to the signature, then they have divergent share hash chains,
448         # then completely different block hash trees + salt + share data,
449         # then they all share the same encprivkey at the end. The sizes
450         # of everything are the same for all shares.
451
452         sign_started = time.time()
453         signature = privkey.sign(prefix)
454         self._status.timings["sign"] = time.time() - sign_started
455
456         verification_key = pubkey.serialize()
457
458         final_shares = {}
459         for shnum in range(self.total_shares):
460             final_share = pack_share(prefix,
461                                      verification_key,
462                                      signature,
463                                      share_hash_chain[shnum],
464                                      block_hash_trees[shnum],
465                                      all_shares[shnum],
466                                      encprivkey)
467             final_shares[shnum] = final_share
468         elapsed = time.time() - started
469         self._status.timings["pack"] = elapsed
470         self.shares = final_shares
471         self.root_hash = root_hash
472
473         # we also need to build up the version identifier for what we're
474         # pushing. Extract the offsets from one of our shares.
475         assert final_shares
476         offsets = unpack_header(final_shares.values()[0])[-1]
477         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
478         verinfo = (self._new_seqnum, root_hash, self.salt,
479                    self.segment_size, len(self.newdata),
480                    self.required_shares, self.total_shares,
481                    prefix, offsets_tuple)
482         self.versioninfo = verinfo
483
484
485
486     def _send_shares(self, needed):
487         self.log("_send_shares")
488
489         # we're finally ready to send out our shares. If we encounter any
490         # surprises here, it's because somebody else is writing at the same
491         # time. (Note: in the future, when we remove the _query_peers() step
492         # and instead speculate about [or remember] which shares are where,
493         # surprises here are *not* indications of UncoordinatedWriteError,
494         # and we'll need to respond to them more gracefully.)
495
496         # needed is a set of (peerid, shnum) tuples. The first thing we do is
497         # organize it by peerid.
498
499         peermap = DictOfSets()
500         for (peerid, shnum) in needed:
501             peermap.add(peerid, shnum)
502
503         # the next thing is to build up a bunch of test vectors. The
504         # semantics of Publish are that we perform the operation if the world
505         # hasn't changed since the ServerMap was constructed (more or less).
506         # For every share we're trying to place, we create a test vector that
507         # tests to see if the server*share still corresponds to the
508         # map.
509
510         all_tw_vectors = {} # maps peerid to tw_vectors
511         sm = self._servermap.servermap
512
513         for key in needed:
514             (peerid, shnum) = key
515
516             if key in sm:
517                 # an old version of that share already exists on the
518                 # server, according to our servermap. We will create a
519                 # request that attempts to replace it.
520                 old_versionid, old_timestamp = sm[key]
521                 (old_seqnum, old_root_hash, old_salt, old_segsize,
522                  old_datalength, old_k, old_N, old_prefix,
523                  old_offsets_tuple) = old_versionid
524                 old_checkstring = pack_checkstring(old_seqnum,
525                                                    old_root_hash,
526                                                    old_salt)
527                 testv = (0, len(old_checkstring), "eq", old_checkstring)
528
529             else:
530                 # add a testv that requires the share not exist
531                 #testv = (0, 1, 'eq', "")
532
533                 # Unfortunately, foolscap-0.2.5 has a bug in the way inbound
534                 # constraints are handled. If the same object is referenced
535                 # multiple times inside the arguments, foolscap emits a
536                 # 'reference' token instead of a distinct copy of the
537                 # argument. The bug is that these 'reference' tokens are not
538                 # accepted by the inbound constraint code. To work around
539                 # this, we need to prevent python from interning the
540                 # (constant) tuple, by creating a new copy of this vector
541                 # each time. This bug is fixed in later versions of foolscap.
542                 testv = tuple([0, 1, 'eq', ""])
543
544             testvs = [testv]
545             # the write vector is simply the share
546             writev = [(0, self.shares[shnum])]
547
548             if peerid not in all_tw_vectors:
549                 all_tw_vectors[peerid] = {}
550                 # maps shnum to (testvs, writevs, new_length)
551             assert shnum not in all_tw_vectors[peerid]
552
553             all_tw_vectors[peerid][shnum] = (testvs, writev, None)
554
555         # we read the checkstring back from each share, however we only use
556         # it to detect whether there was a new share that we didn't know
557         # about. The success or failure of the write will tell us whether
558         # there was a collision or not. If there is a collision, the first
559         # thing we'll do is update the servermap, which will find out what
560         # happened. We could conceivably reduce a roundtrip by using the
561         # readv checkstring to populate the servermap, but really we'd have
562         # to read enough data to validate the signatures too, so it wouldn't
563         # be an overall win.
564         read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
565
566         # ok, send the messages!
567         started = time.time()
568         dl = []
569         for (peerid, tw_vectors) in all_tw_vectors.items():
570
571             write_enabler = self._node.get_write_enabler(peerid)
572             renew_secret = self._node.get_renewal_secret(peerid)
573             cancel_secret = self._node.get_cancel_secret(peerid)
574             secrets = (write_enabler, renew_secret, cancel_secret)
575             shnums = tw_vectors.keys()
576
577             d = self._do_testreadwrite(peerid, secrets,
578                                        tw_vectors, read_vector)
579             d.addCallbacks(self._got_write_answer, self._got_write_error,
580                            callbackArgs=(peerid, shnums, started),
581                            errbackArgs=(peerid, shnums, started))
582             d.addErrback(self._fatal_error)
583             dl.append(d)
584
585         self._update_status()
586         return defer.DeferredList(dl) # purely for testing
587
588     def _do_testreadwrite(self, peerid, secrets,
589                           tw_vectors, read_vector):
590         storage_index = self._storage_index
591         ss = self.connections[peerid]
592
593         #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
594         d = ss.callRemote("slot_testv_and_readv_and_writev",
595                           storage_index,
596                           secrets,
597                           tw_vectors,
598                           read_vector)
599         return d
600
601     def _got_write_answer(self, answer, peerid, shnums, started):
602         lp = self.log("_got_write_answer from %s" %
603                       idlib.shortnodeid_b2a(peerid))
604         for shnum in shnums:
605             self.outstanding.discard( (peerid, shnum) )
606
607         now = time.time()
608         elapsed = now - started
609         self._status.add_per_server_time(peerid, elapsed)
610
611         wrote, read_data = answer
612
613         if not wrote:
614             self.log("our testv failed, so the write did not happen",
615                      parent=lp, level=log.WEIRD)
616             self.surprised = True
617             self.bad_peers.add(peerid) # don't ask them again
618             # use the checkstring to add information to the log message
619             for (shnum,readv) in read_data.items():
620                 checkstring = readv[0]
621                 (other_seqnum,
622                  other_roothash,
623                  other_salt) = unpack_checkstring(checkstring)
624                 expected_version = self._servermap.version_on_peer(peerid,
625                                                                    shnum)
626                 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
627                  offsets_tuple) = expected_version
628                 self.log("somebody modified the share on us:"
629                          " shnum=%d: I thought they had #%d:R=%s,"
630                          " but testv reported #%d:R=%s" %
631                          (shnum,
632                           seqnum, base32.b2a(root_hash)[:4],
633                           other_seqnum, base32.b2a(other_roothash)[:4]),
634                          parent=lp, level=log.NOISY)
635             # self.loop() will take care of finding new homes
636             return
637
638         for shnum in shnums:
639             self.placed.add( (peerid, shnum) )
640             # and update the servermap
641             self._servermap.add_new_share(peerid, shnum,
642                                           self.versioninfo, started)
643
644         surprise_shares = set(read_data.keys()) - set(shnums)
645         if surprise_shares:
646             self.log("they had shares %s that we didn't know about" %
647                      (list(surprise_shares),),
648                      parent=lp, level=log.WEIRD)
649             self.surprised = True
650             return
651
652         # self.loop() will take care of checking to see if we're done
653         return
654
655     def _got_write_error(self, f, peerid, shnums, started):
656         for shnum in shnums:
657             self.outstanding.discard( (peerid, shnum) )
658         self.bad_peers.add(peerid)
659         self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
660                  shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
661                  failure=f,
662                  level=log.UNUSUAL)
663         # self.loop() will take care of checking to see if we're done
664         return
665
666
667
668     def _log_dispatch_map(self, dispatch_map):
669         for shnum, places in dispatch_map.items():
670             sent_to = [(idlib.shortnodeid_b2a(peerid),
671                         seqnum,
672                         base32.b2a(root_hash)[:4])
673                        for (peerid,seqnum,root_hash) in places]
674             self.log(" share %d sent to: %s" % (shnum, sent_to),
675                      level=log.NOISY)
676
677     def _maybe_recover(self, (surprised, dispatch_map)):
678         self.log("_maybe_recover, surprised=%s, dispatch_map:" % surprised,
679                  level=log.NOISY)
680         self._log_dispatch_map(dispatch_map)
681         if not surprised:
682             self.log(" no recovery needed")
683             return
684         self.log("We need recovery!", level=log.WEIRD)
685         print "RECOVERY NOT YET IMPLEMENTED"
686         # but dispatch_map will help us do it
687         raise UncoordinatedWriteError("I was surprised!")
688
689     def _done(self, res):
690         if not self._running:
691             return
692         self._running = False
693         now = time.time()
694         self._status.timings["total"] = now - self._started
695         self._status.set_active(False)
696         if isinstance(res, failure.Failure):
697             self.log("Retrieve done, with failure", failure=res)
698             self._status.set_status("Failed")
699         else:
700             self._status.set_status("Done")
701             self._status.set_progress(1.0)
702         eventually(self.done_deferred.callback, res)
703
704     def get_status(self):
705         return self._status
706
707