3 import os, struct, time
4 from itertools import count
5 from zope.interface import implements
6 from twisted.internet import defer
7 from twisted.python import failure
8 from allmydata.interfaces import IPublishStatus, FileTooLargeError
9 from allmydata.util import base32, hashutil, mathutil, idlib, log
10 from allmydata import hashtree, codec, storage
11 from pycryptopp.cipher.aes import AES
12 from foolscap.eventual import eventually
14 from common import MODE_WRITE, MODE_CHECK, DictOfSets, \
15 UncoordinatedWriteError, NotEnoughServersError
16 from servermap import ServerMap
17 from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
18 unpack_checkstring, SIGNED_PREFIX
21 implements(IPublishStatus)
22 statusid_counter = count(0)
25 self.timings["send_per_server"] = {}
29 self.storage_index = None
31 self.encoding = ("?", "?")
33 self.status = "Not started"
35 self.counter = self.statusid_counter.next()
36 self.started = time.time()
38 def add_per_server_time(self, peerid, elapsed):
39 if peerid not in self.timings["send_per_server"]:
40 self.timings["send_per_server"][peerid] = []
41 self.timings["send_per_server"][peerid].append(elapsed)
43 def get_started(self):
45 def get_storage_index(self):
46 return self.storage_index
47 def get_encoding(self):
49 def using_helper(self):
51 def get_servermap(self):
57 def get_progress(self):
61 def get_counter(self):
64 def set_storage_index(self, si):
65 self.storage_index = si
66 def set_helper(self, helper):
68 def set_servermap(self, servermap):
69 self.servermap = servermap
70 def set_encoding(self, k, n):
71 self.encoding = (k, n)
72 def set_size(self, size):
74 def set_status(self, status):
76 def set_progress(self, value):
78 def set_active(self, value):
82 """I represent a single act of publishing the mutable file to the grid. I
83 will only publish my data if the servermap I am using still represents
84 the current state of the world.
86 To make the initial publish, set servermap to None.
89 # we limit the segment size as usual to constrain our memory footprint.
90 # The max segsize is higher for mutable files, because we want to support
91 # dirnodes with up to 10k children, and each child uses about 330 bytes.
92 # If you actually put that much into a directory you'll be using a
93 # footprint of around 14MB, which is higher than we'd like, but it is
94 # more important right now to support large directories than to make
95 # memory usage small when you use them. Once we implement MDMF (with
96 # multiple segments), we will drop this back down, probably to 128KiB.
97 MAX_SEGMENT_SIZE = 3500000
99 def __init__(self, filenode, servermap):
100 self._node = filenode
101 self._servermap = servermap
102 self._storage_index = self._node.get_storage_index()
103 self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
104 num = self._node._client.log("Publish(%s): starting" % prefix)
105 self._log_number = num
108 self._status = PublishStatus()
109 self._status.set_storage_index(self._storage_index)
110 self._status.set_helper(False)
111 self._status.set_progress(0.0)
112 self._status.set_active(True)
114 def get_status(self):
117 def log(self, *args, **kwargs):
118 if 'parent' not in kwargs:
119 kwargs['parent'] = self._log_number
120 if "facility" not in kwargs:
121 kwargs["facility"] = "tahoe.mutable.publish"
122 return log.msg(*args, **kwargs)
124 def publish(self, newdata):
125 """Publish the filenode's current contents. Returns a Deferred that
126 fires (with None) when the publish has done as much work as it's ever
127 going to do, or errbacks with ConsistencyError if it detects a
131 # 1: generate shares (SDMF: files are small, so we can do it in RAM)
132 # 2: perform peer selection, get candidate servers
133 # 2a: send queries to n+epsilon servers, to determine current shares
134 # 2b: based upon responses, create target map
135 # 3: send slot_testv_and_readv_and_writev messages
136 # 4: as responses return, update share-dispatch table
137 # 4a: may need to run recovery algorithm
138 # 5: when enough responses are back, we're done
140 self.log("starting publish, datalen is %s" % len(newdata))
141 if len(newdata) > self.MAX_SEGMENT_SIZE:
142 raise FileTooLargeError("SDMF is limited to one segment, and "
143 "%d > %d" % (len(newdata),
144 self.MAX_SEGMENT_SIZE))
145 self._status.set_size(len(newdata))
146 self._status.set_status("Started")
147 self._started = time.time()
149 self.done_deferred = defer.Deferred()
151 self._writekey = self._node.get_writekey()
152 assert self._writekey, "need write capability to publish"
154 # first, which servers will we publish to? We require that the
155 # servermap was updated in MODE_WRITE, so we can depend upon the
156 # peerlist computed by that process instead of computing our own.
158 assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
159 # we will push a version that is one larger than anything present
160 # in the grid, according to the servermap.
161 self._new_seqnum = self._servermap.highest_seqnum() + 1
163 # If we don't have a servermap, that's because we're doing the
166 self._servermap = ServerMap()
167 self._status.set_servermap(self._servermap)
169 self.log(format="new seqnum will be %(seqnum)d",
170 seqnum=self._new_seqnum, level=log.NOISY)
172 # having an up-to-date servermap (or using a filenode that was just
173 # created for the first time) also guarantees that the following
174 # fields are available
175 self.readkey = self._node.get_readkey()
176 self.required_shares = self._node.get_required_shares()
177 assert self.required_shares is not None
178 self.total_shares = self._node.get_total_shares()
179 assert self.total_shares is not None
180 self._status.set_encoding(self.required_shares, self.total_shares)
182 self._pubkey = self._node.get_pubkey()
184 self._privkey = self._node.get_privkey()
186 self._encprivkey = self._node.get_encprivkey()
188 client = self._node._client
189 full_peerlist = client.get_permuted_peers("storage",
191 self.full_peerlist = full_peerlist # for use later, immutable
192 self.bad_peers = set() # peerids who have errbacked/refused requests
194 self.newdata = newdata
195 self.salt = os.urandom(16)
197 self.setup_encoding_parameters()
199 # if we experience any surprises (writes which were rejected because
200 # our test vector did not match, or shares which we didn't expect to
201 # see), we set this flag and report an UncoordinatedWriteError at the
202 # end of the publish process.
203 self.surprised = False
205 # as a failsafe, refuse to iterate through self.loop more than a
207 self.looplimit = 1000
209 # we keep track of three tables. The first is our goal: which share
210 # we want to see on which servers. This is initially populated by the
211 # existing servermap.
212 self.goal = set() # pairs of (peerid, shnum) tuples
214 # the second table is our list of outstanding queries: those which
215 # are in flight and may or may not be delivered, accepted, or
216 # acknowledged. Items are added to this table when the request is
217 # sent, and removed when the response returns (or errbacks).
218 self.outstanding = set() # (peerid, shnum) tuples
220 # the third is a table of successes: share which have actually been
221 # placed. These are populated when responses come back with success.
222 # When self.placed == self.goal, we're done.
223 self.placed = set() # (peerid, shnum) tuples
225 # we also keep a mapping from peerid to RemoteReference. Each time we
226 # pull a connection out of the full peerlist, we add it to this for
228 self.connections = {}
230 self.bad_share_checkstrings = {}
232 # we use the servermap to populate the initial goal: this way we will
233 # try to update each existing share in place.
234 for (peerid, shnum) in self._servermap.servermap:
235 self.goal.add( (peerid, shnum) )
236 self.connections[peerid] = self._servermap.connections[peerid]
237 # then we add in all the shares that were bad (corrupted, bad
238 # signatures, etc). We want to replace these.
239 for (peerid, shnum, old_checkstring) in self._servermap.bad_shares:
240 self.goal.add( (peerid, shnum) )
241 self.bad_share_checkstrings[ (peerid, shnum) ] = old_checkstring
242 self.connections[peerid] = self._servermap.connections[peerid]
244 # create the shares. We'll discard these as they are delivered. SDMF:
245 # we're allowed to hold everything in memory.
247 self._status.timings["setup"] = time.time() - self._started
248 d = self._encrypt_and_encode()
249 d.addCallback(self._generate_shares)
250 def _start_pushing(res):
251 self._started_pushing = time.time()
253 d.addCallback(_start_pushing)
254 d.addCallback(self.loop) # trigger delivery
255 d.addErrback(self._fatal_error)
257 return self.done_deferred
259 def setup_encoding_parameters(self):
260 segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
261 # this must be a multiple of self.required_shares
262 segment_size = mathutil.next_multiple(segment_size,
263 self.required_shares)
264 self.segment_size = segment_size
266 self.num_segments = mathutil.div_ceil(len(self.newdata),
269 self.num_segments = 0
270 assert self.num_segments in [0, 1,] # SDMF restrictions
272 def _fatal_error(self, f):
273 self.log("error during loop", failure=f, level=log.UNUSUAL)
276 def _update_status(self):
277 self._status.set_status("Sending Shares: %d placed out of %d, "
278 "%d messages outstanding" %
281 len(self.outstanding)))
282 self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
284 def loop(self, ignored=None):
285 self.log("entering loop", level=log.NOISY)
286 if not self._running:
290 if self.looplimit <= 0:
291 raise RuntimeError("loop limit exceeded")
294 # don't send out any new shares, just wait for the outstanding
295 # ones to be retired.
296 self.log("currently surprised, so don't send any new shares",
300 # how far are we from our goal?
301 needed = self.goal - self.placed - self.outstanding
302 self._update_status()
305 # we need to send out new shares
306 self.log(format="need to send %(needed)d new shares",
307 needed=len(needed), level=log.NOISY)
308 self._send_shares(needed)
312 # queries are still pending, keep waiting
313 self.log(format="%(outstanding)d queries still outstanding",
314 outstanding=len(self.outstanding),
318 # no queries outstanding, no placements needed: we're done
319 self.log("no queries outstanding, no placements needed: done",
320 level=log.OPERATIONAL)
322 elapsed = now - self._started_pushing
323 self._status.timings["push"] = elapsed
324 return self._done(None)
326 def log_goal(self, goal, message=""):
328 for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
329 logmsg.append("sh%d to [%s]" % (shnum,
330 idlib.shortnodeid_b2a(peerid)))
331 self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
332 self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
335 def update_goal(self):
336 # if log.recording_noisy
338 self.log_goal(self.goal, "before update: ")
340 # first, remove any bad peers from our goal
341 self.goal = set([ (peerid, shnum)
342 for (peerid, shnum) in self.goal
343 if peerid not in self.bad_peers ])
345 # find the homeless shares:
346 homefull_shares = set([shnum for (peerid, shnum) in self.goal])
347 homeless_shares = set(range(self.total_shares)) - homefull_shares
348 homeless_shares = sorted(list(homeless_shares))
349 # place them somewhere. We prefer unused servers at the beginning of
350 # the available peer list.
352 if not homeless_shares:
355 # if an old share X is on a node, put the new share X there too.
356 # TODO: 1: redistribute shares to achieve one-per-peer, by copying
357 # shares from existing peers to new (less-crowded) ones. The
358 # old shares must still be updated.
359 # TODO: 2: move those shares instead of copying them, to reduce future
362 # this is a bit CPU intensive but easy to analyze. We create a sort
363 # order for each peerid. If the peerid is marked as bad, we don't
364 # even put them in the list. Then we care about the number of shares
365 # which have already been assigned to them. After that we care about
366 # their permutation order.
367 old_assignments = DictOfSets()
368 for (peerid, shnum) in self.goal:
369 old_assignments.add(peerid, shnum)
372 for i, (peerid, ss) in enumerate(self.full_peerlist):
373 if peerid in self.bad_peers:
375 entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
376 peerlist.append(entry)
380 raise NotEnoughServersError("Ran out of non-bad servers")
383 # we then index this peerlist with an integer, because we may have to
384 # wrap. We update the goal as we go.
386 for shnum in homeless_shares:
387 (ignored1, ignored2, peerid, ss) = peerlist[i]
388 # TODO: if we are forced to send a share to a server that already
389 # has one, we may have two write requests in flight, and the
390 # servermap (which was computed before either request was sent)
391 # won't reflect the new shares, so the second response will cause
392 # us to be surprised ("unexpected share on peer"), causing the
393 # publish to fail with an UncoordinatedWriteError. This is
394 # troublesome but not really a bit problem. Fix it at some point.
395 self.goal.add( (peerid, shnum) )
396 self.connections[peerid] = ss
398 if i >= len(peerlist):
401 self.log_goal(self.goal, "after update: ")
405 def _encrypt_and_encode(self):
406 # this returns a Deferred that fires with a list of (sharedata,
407 # sharenum) tuples. TODO: cache the ciphertext, only produce the
408 # shares that we care about.
409 self.log("_encrypt_and_encode")
411 self._status.set_status("Encrypting")
412 started = time.time()
414 key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
416 crypttext = enc.process(self.newdata)
417 assert len(crypttext) == len(self.newdata)
420 self._status.timings["encrypt"] = now - started
425 self._status.set_status("Encoding")
426 fec = codec.CRSEncoder()
427 fec.set_params(self.segment_size,
428 self.required_shares, self.total_shares)
429 piece_size = fec.get_block_size()
430 crypttext_pieces = [None] * self.required_shares
431 for i in range(len(crypttext_pieces)):
432 offset = i * piece_size
433 piece = crypttext[offset:offset+piece_size]
434 piece = piece + "\x00"*(piece_size - len(piece)) # padding
435 crypttext_pieces[i] = piece
436 assert len(piece) == piece_size
438 d = fec.encode(crypttext_pieces)
439 def _done_encoding(res):
440 elapsed = time.time() - started
441 self._status.timings["encode"] = elapsed
443 d.addCallback(_done_encoding)
446 def _generate_shares(self, shares_and_shareids):
447 # this sets self.shares and self.root_hash
448 self.log("_generate_shares")
449 self._status.set_status("Generating Shares")
450 started = time.time()
452 # we should know these by now
453 privkey = self._privkey
454 encprivkey = self._encprivkey
455 pubkey = self._pubkey
457 (shares, share_ids) = shares_and_shareids
459 assert len(shares) == len(share_ids)
460 assert len(shares) == self.total_shares
462 block_hash_trees = {}
463 share_hash_leaves = [None] * len(shares)
464 for i in range(len(shares)):
465 share_data = shares[i]
467 all_shares[shnum] = share_data
469 # build the block hash tree. SDMF has only one leaf.
470 leaves = [hashutil.block_hash(share_data)]
471 t = hashtree.HashTree(leaves)
472 block_hash_trees[shnum] = block_hash_tree = list(t)
473 share_hash_leaves[shnum] = t[0]
474 for leaf in share_hash_leaves:
475 assert leaf is not None
476 share_hash_tree = hashtree.HashTree(share_hash_leaves)
477 share_hash_chain = {}
478 for shnum in range(self.total_shares):
479 needed_hashes = share_hash_tree.needed_hashes(shnum)
480 share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
481 for i in needed_hashes ] )
482 root_hash = share_hash_tree[0]
483 assert len(root_hash) == 32
484 self.log("my new root_hash is %s" % base32.b2a(root_hash))
486 prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
487 self.required_shares, self.total_shares,
488 self.segment_size, len(self.newdata))
490 # now pack the beginning of the share. All shares are the same up
491 # to the signature, then they have divergent share hash chains,
492 # then completely different block hash trees + salt + share data,
493 # then they all share the same encprivkey at the end. The sizes
494 # of everything are the same for all shares.
496 sign_started = time.time()
497 signature = privkey.sign(prefix)
498 self._status.timings["sign"] = time.time() - sign_started
500 verification_key = pubkey.serialize()
503 for shnum in range(self.total_shares):
504 final_share = pack_share(prefix,
507 share_hash_chain[shnum],
508 block_hash_trees[shnum],
511 final_shares[shnum] = final_share
512 elapsed = time.time() - started
513 self._status.timings["pack"] = elapsed
514 self.shares = final_shares
515 self.root_hash = root_hash
517 # we also need to build up the version identifier for what we're
518 # pushing. Extract the offsets from one of our shares.
520 offsets = unpack_header(final_shares.values()[0])[-1]
521 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
522 verinfo = (self._new_seqnum, root_hash, self.salt,
523 self.segment_size, len(self.newdata),
524 self.required_shares, self.total_shares,
525 prefix, offsets_tuple)
526 self.versioninfo = verinfo
530 def _send_shares(self, needed):
531 self.log("_send_shares")
533 # we're finally ready to send out our shares. If we encounter any
534 # surprises here, it's because somebody else is writing at the same
535 # time. (Note: in the future, when we remove the _query_peers() step
536 # and instead speculate about [or remember] which shares are where,
537 # surprises here are *not* indications of UncoordinatedWriteError,
538 # and we'll need to respond to them more gracefully.)
540 # needed is a set of (peerid, shnum) tuples. The first thing we do is
541 # organize it by peerid.
543 peermap = DictOfSets()
544 for (peerid, shnum) in needed:
545 peermap.add(peerid, shnum)
547 # the next thing is to build up a bunch of test vectors. The
548 # semantics of Publish are that we perform the operation if the world
549 # hasn't changed since the ServerMap was constructed (more or less).
550 # For every share we're trying to place, we create a test vector that
551 # tests to see if the server*share still corresponds to the
554 all_tw_vectors = {} # maps peerid to tw_vectors
555 sm = self._servermap.servermap
558 (peerid, shnum) = key
561 # an old version of that share already exists on the
562 # server, according to our servermap. We will create a
563 # request that attempts to replace it.
564 old_versionid, old_timestamp = sm[key]
565 (old_seqnum, old_root_hash, old_salt, old_segsize,
566 old_datalength, old_k, old_N, old_prefix,
567 old_offsets_tuple) = old_versionid
568 old_checkstring = pack_checkstring(old_seqnum,
571 testv = (0, len(old_checkstring), "eq", old_checkstring)
573 elif key in self.bad_share_checkstrings:
574 old_checkstring = self.bad_share_checkstrings[key]
575 testv = (0, len(old_checkstring), "eq", old_checkstring)
578 # add a testv that requires the share not exist
579 #testv = (0, 1, 'eq', "")
581 # Unfortunately, foolscap-0.2.5 has a bug in the way inbound
582 # constraints are handled. If the same object is referenced
583 # multiple times inside the arguments, foolscap emits a
584 # 'reference' token instead of a distinct copy of the
585 # argument. The bug is that these 'reference' tokens are not
586 # accepted by the inbound constraint code. To work around
587 # this, we need to prevent python from interning the
588 # (constant) tuple, by creating a new copy of this vector
589 # each time. This bug is fixed in later versions of foolscap.
590 testv = tuple([0, 1, 'eq', ""])
593 # the write vector is simply the share
594 writev = [(0, self.shares[shnum])]
596 if peerid not in all_tw_vectors:
597 all_tw_vectors[peerid] = {}
598 # maps shnum to (testvs, writevs, new_length)
599 assert shnum not in all_tw_vectors[peerid]
601 all_tw_vectors[peerid][shnum] = (testvs, writev, None)
603 # we read the checkstring back from each share, however we only use
604 # it to detect whether there was a new share that we didn't know
605 # about. The success or failure of the write will tell us whether
606 # there was a collision or not. If there is a collision, the first
607 # thing we'll do is update the servermap, which will find out what
608 # happened. We could conceivably reduce a roundtrip by using the
609 # readv checkstring to populate the servermap, but really we'd have
610 # to read enough data to validate the signatures too, so it wouldn't
612 read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
614 # ok, send the messages!
615 self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
616 started = time.time()
617 for (peerid, tw_vectors) in all_tw_vectors.items():
619 write_enabler = self._node.get_write_enabler(peerid)
620 renew_secret = self._node.get_renewal_secret(peerid)
621 cancel_secret = self._node.get_cancel_secret(peerid)
622 secrets = (write_enabler, renew_secret, cancel_secret)
623 shnums = tw_vectors.keys()
626 self.outstanding.add( (peerid, shnum) )
628 d = self._do_testreadwrite(peerid, secrets,
629 tw_vectors, read_vector)
630 d.addCallbacks(self._got_write_answer, self._got_write_error,
631 callbackArgs=(peerid, shnums, started),
632 errbackArgs=(peerid, shnums, started))
633 d.addCallback(self.loop)
634 d.addErrback(self._fatal_error)
636 self._update_status()
637 self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
639 def _do_testreadwrite(self, peerid, secrets,
640 tw_vectors, read_vector):
641 storage_index = self._storage_index
642 ss = self.connections[peerid]
644 #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
645 d = ss.callRemote("slot_testv_and_readv_and_writev",
652 def _got_write_answer(self, answer, peerid, shnums, started):
653 lp = self.log("_got_write_answer from %s" %
654 idlib.shortnodeid_b2a(peerid))
656 self.outstanding.discard( (peerid, shnum) )
659 elapsed = now - started
660 self._status.add_per_server_time(peerid, elapsed)
662 wrote, read_data = answer
664 surprise_shares = set(read_data.keys()) - set(shnums)
666 self.log("they had shares %s that we didn't know about" %
667 (list(surprise_shares),),
668 parent=lp, level=log.WEIRD, umid="un9CSQ")
669 self.surprised = True
672 # TODO: there are two possibilities. The first is that the server
673 # is full (or just doesn't want to give us any room), which means
674 # we shouldn't ask them again, but is *not* an indication of an
675 # uncoordinated write. The second is that our testv failed, which
676 # *does* indicate an uncoordinated write. We currently don't have
677 # a way to tell these two apart (in fact, the storage server code
678 # doesn't have the option of refusing our share).
680 # If the server is full, mark the peer as bad (so we don't ask
681 # them again), but don't set self.surprised. The loop() will find
684 # If the testv failed, log it, set self.surprised, but don't
685 # bother adding to self.bad_peers .
687 self.log("our testv failed, so the write did not happen",
688 parent=lp, level=log.WEIRD, umid="8sc26g")
689 self.surprised = True
690 self.bad_peers.add(peerid) # don't ask them again
691 # use the checkstring to add information to the log message
692 for (shnum,readv) in read_data.items():
693 checkstring = readv[0]
696 other_salt) = unpack_checkstring(checkstring)
697 expected_version = self._servermap.version_on_peer(peerid,
700 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
701 offsets_tuple) = expected_version
702 self.log("somebody modified the share on us:"
703 " shnum=%d: I thought they had #%d:R=%s,"
704 " but testv reported #%d:R=%s" %
706 seqnum, base32.b2a(root_hash)[:4],
707 other_seqnum, base32.b2a(other_roothash)[:4]),
708 parent=lp, level=log.NOISY)
709 # if expected_version==None, then we didn't expect to see a
710 # share on that peer, and the 'surprise_shares' clause above
711 # will have logged it.
712 # self.loop() will take care of finding new homes
716 self.placed.add( (peerid, shnum) )
717 # and update the servermap
718 self._servermap.add_new_share(peerid, shnum,
719 self.versioninfo, started)
721 # self.loop() will take care of checking to see if we're done
724 def _got_write_error(self, f, peerid, shnums, started):
726 self.outstanding.discard( (peerid, shnum) )
727 self.bad_peers.add(peerid)
728 self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
729 shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
732 # self.loop() will take care of checking to see if we're done
736 def _done(self, res):
737 if not self._running:
739 self._running = False
741 self._status.timings["total"] = now - self._started
742 self._status.set_active(False)
743 if isinstance(res, failure.Failure):
744 self.log("Publish done, with failure", failure=res,
745 level=log.WEIRD, umid="nRsR9Q")
746 self._status.set_status("Failed")
748 self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
749 self._status.set_status("UncoordinatedWriteError")
751 res = failure.Failure(UncoordinatedWriteError())
754 self.log("Publish done, success")
755 self._status.set_status("Done")
756 self._status.set_progress(1.0)
757 eventually(self.done_deferred.callback, res)