4 from StringIO import StringIO
5 from itertools import count
6 from zope.interface import implements
7 from twisted.internet import defer
8 from twisted.python import failure
9 from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
11 from allmydata.util import base32, hashutil, mathutil, idlib, log
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from foolscap.api import eventually, fireEventually
18 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
19 UncoordinatedWriteError, NotEnoughServersError
20 from allmydata.mutable.servermap import ServerMap
21 from allmydata.mutable.layout import unpack_checkstring, MDMFSlotWriteProxy, \
25 DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
26 PUSHING_BLOCKS_STATE = 0
27 PUSHING_EVERYTHING_ELSE_STATE = 1
31 implements(IPublishStatus)
32 statusid_counter = count(0)
35 self.timings["send_per_server"] = {}
39 self.storage_index = None
41 self.encoding = ("?", "?")
43 self.status = "Not started"
45 self.counter = self.statusid_counter.next()
46 self.started = time.time()
48 def add_per_server_time(self, peerid, elapsed):
49 if peerid not in self.timings["send_per_server"]:
50 self.timings["send_per_server"][peerid] = []
51 self.timings["send_per_server"][peerid].append(elapsed)
53 def get_started(self):
55 def get_storage_index(self):
56 return self.storage_index
57 def get_encoding(self):
59 def using_helper(self):
61 def get_servermap(self):
67 def get_progress(self):
71 def get_counter(self):
74 def set_storage_index(self, si):
75 self.storage_index = si
76 def set_helper(self, helper):
78 def set_servermap(self, servermap):
79 self.servermap = servermap
80 def set_encoding(self, k, n):
81 self.encoding = (k, n)
82 def set_size(self, size):
84 def set_status(self, status):
86 def set_progress(self, value):
88 def set_active(self, value):
91 class LoopLimitExceededError(Exception):
95 """I represent a single act of publishing the mutable file to the grid. I
96 will only publish my data if the servermap I am using still represents
97 the current state of the world.
99 To make the initial publish, set servermap to None.
102 def __init__(self, filenode, storage_broker, servermap):
103 self._node = filenode
104 self._storage_broker = storage_broker
105 self._servermap = servermap
106 self._storage_index = self._node.get_storage_index()
107 self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
108 num = self.log("Publish(%s): starting" % prefix, parent=None)
109 self._log_number = num
111 self._first_write_error = None
112 self._last_failure = None
114 self._status = PublishStatus()
115 self._status.set_storage_index(self._storage_index)
116 self._status.set_helper(False)
117 self._status.set_progress(0.0)
118 self._status.set_active(True)
119 self._version = self._node.get_version()
120 assert self._version in (SDMF_VERSION, MDMF_VERSION)
123 def get_status(self):
126 def log(self, *args, **kwargs):
127 if 'parent' not in kwargs:
128 kwargs['parent'] = self._log_number
129 if "facility" not in kwargs:
130 kwargs["facility"] = "tahoe.mutable.publish"
131 return log.msg(*args, **kwargs)
134 def update(self, data, offset, blockhashes, version):
136 I replace the contents of this file with the contents of data,
137 starting at offset. I return a Deferred that fires with None
138 when the replacement has been completed, or with an error if
139 something went wrong during the process.
141 Note that this process will not upload new shares. If the file
142 being updated is in need of repair, callers will have to repair
146 # 1: Make peer assignments. We'll assign each share that we know
147 # about on the grid to that peer that currently holds that
148 # share, and will not place any new shares.
149 # 2: Setup encoding parameters. Most of these will stay the same
150 # -- datalength will change, as will some of the offsets.
151 # 3. Upload the new segments.
153 assert IMutableUploadable.providedBy(data)
157 # XXX: Use the MutableFileVersion instead.
158 self.datalength = self._node.get_size()
159 if data.get_size() > self.datalength:
160 self.datalength = data.get_size()
162 self.log("starting update")
163 self.log("adding new data of length %d at offset %d" % \
164 (data.get_size(), offset))
165 self.log("new data length is %d" % self.datalength)
166 self._status.set_size(self.datalength)
167 self._status.set_status("Started")
168 self._started = time.time()
170 self.done_deferred = defer.Deferred()
172 self._writekey = self._node.get_writekey()
173 assert self._writekey, "need write capability to publish"
175 # first, which servers will we publish to? We require that the
176 # servermap was updated in MODE_WRITE, so we can depend upon the
177 # peerlist computed by that process instead of computing our own.
178 assert self._servermap
179 assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
180 # we will push a version that is one larger than anything present
181 # in the grid, according to the servermap.
182 self._new_seqnum = self._servermap.highest_seqnum() + 1
183 self._status.set_servermap(self._servermap)
185 self.log(format="new seqnum will be %(seqnum)d",
186 seqnum=self._new_seqnum, level=log.NOISY)
188 # We're updating an existing file, so all of the following
189 # should be available.
190 self.readkey = self._node.get_readkey()
191 self.required_shares = self._node.get_required_shares()
192 assert self.required_shares is not None
193 self.total_shares = self._node.get_total_shares()
194 assert self.total_shares is not None
195 self._status.set_encoding(self.required_shares, self.total_shares)
197 self._pubkey = self._node.get_pubkey()
199 self._privkey = self._node.get_privkey()
201 self._encprivkey = self._node.get_encprivkey()
203 sb = self._storage_broker
204 full_peerlist = [(s.get_serverid(), s.get_rref())
205 for s in sb.get_servers_for_psi(self._storage_index)]
206 self.full_peerlist = full_peerlist # for use later, immutable
207 self.bad_peers = set() # peerids who have errbacked/refused requests
209 # This will set self.segment_size, self.num_segments, and
210 # self.fec. TODO: Does it know how to do the offset? Probably
211 # not. So do that part next.
212 self.setup_encoding_parameters(offset=offset)
214 # if we experience any surprises (writes which were rejected because
215 # our test vector did not match, or shares which we didn't expect to
216 # see), we set this flag and report an UncoordinatedWriteError at the
217 # end of the publish process.
218 self.surprised = False
220 # we keep track of three tables. The first is our goal: which share
221 # we want to see on which servers. This is initially populated by the
222 # existing servermap.
223 self.goal = set() # pairs of (peerid, shnum) tuples
225 # the second table is our list of outstanding queries: those which
226 # are in flight and may or may not be delivered, accepted, or
227 # acknowledged. Items are added to this table when the request is
228 # sent, and removed when the response returns (or errbacks).
229 self.outstanding = set() # (peerid, shnum) tuples
231 # the third is a table of successes: share which have actually been
232 # placed. These are populated when responses come back with success.
233 # When self.placed == self.goal, we're done.
234 self.placed = set() # (peerid, shnum) tuples
236 # we also keep a mapping from peerid to RemoteReference. Each time we
237 # pull a connection out of the full peerlist, we add it to this for
239 self.connections = {}
241 self.bad_share_checkstrings = {}
243 # This is set at the last step of the publishing process.
244 self.versioninfo = ""
246 # we use the servermap to populate the initial goal: this way we will
247 # try to update each existing share in place. Since we're
248 # updating, we ignore damaged and missing shares -- callers must
249 # do a repair to repair and recreate these.
250 for (peerid, shnum) in self._servermap.servermap:
251 self.goal.add( (peerid, shnum) )
252 self.connections[peerid] = self._servermap.connections[peerid]
255 # SDMF files are updated differently.
256 self._version = MDMF_VERSION
257 writer_class = MDMFSlotWriteProxy
259 # For each (peerid, shnum) in self.goal, we make a
260 # write proxy for that peer. We'll use this to write
261 # shares to the peer.
262 for key in self.goal:
264 write_enabler = self._node.get_write_enabler(peerid)
265 renew_secret = self._node.get_renewal_secret(peerid)
266 cancel_secret = self._node.get_cancel_secret(peerid)
267 secrets = (write_enabler, renew_secret, cancel_secret)
269 self.writers[shnum] = writer_class(shnum,
270 self.connections[peerid],
274 self.required_shares,
278 self.writers[shnum].peerid = peerid
279 assert (peerid, shnum) in self._servermap.servermap
280 old_versionid, old_timestamp = self._servermap.servermap[key]
281 (old_seqnum, old_root_hash, old_salt, old_segsize,
282 old_datalength, old_k, old_N, old_prefix,
283 old_offsets_tuple) = old_versionid
284 self.writers[shnum].set_checkstring(old_seqnum,
288 # Our remote shares will not have a complete checkstring until
289 # after we are done writing share data and have started to write
290 # blocks. In the meantime, we need to know what to look for when
291 # writing, so that we can detect UncoordinatedWriteErrors.
292 self._checkstring = self.writers.values()[0].get_checkstring()
294 # Now, we start pushing shares.
295 self._status.timings["setup"] = time.time() - self._started
296 # First, we encrypt, encode, and publish the shares that we need
297 # to encrypt, encode, and publish.
299 # Our update process fetched these for us. We need to update
300 # them in place as publishing happens.
301 self.blockhashes = {} # (shnum, [blochashes])
302 for (i, bht) in blockhashes.iteritems():
303 # We need to extract the leaves from our old hash tree.
304 old_segcount = mathutil.div_ceil(version[4],
306 h = hashtree.IncompleteHashTree(old_segcount)
307 bht = dict(enumerate(bht))
309 leaves = h[h.get_leaf_index(0):]
310 for j in xrange(self.num_segments - len(leaves)):
313 assert len(leaves) >= self.num_segments
314 self.blockhashes[i] = leaves
315 # This list will now be the leaves that were set during the
316 # initial upload + enough empty hashes to make it a
317 # power-of-two. If we exceed a power of two boundary, we
318 # should be encoding the file over again, and should not be
320 #assert len(self.blockhashes[i]) == \
321 # hashtree.roundup_pow2(self.num_segments), \
322 # len(self.blockhashes[i])
323 # XXX: Except this doesn't work. Figure out why.
325 # These are filled in later, after we've modified the block hash
327 self.sharehash_leaves = None # eventually [sharehashes]
328 self.sharehashes = {} # shnum -> [sharehash leaves necessary to
329 # validate the share]
331 self.log("Starting push")
333 self._state = PUSHING_BLOCKS_STATE
336 return self.done_deferred
339 def publish(self, newdata):
340 """Publish the filenode's current contents. Returns a Deferred that
341 fires (with None) when the publish has done as much work as it's ever
342 going to do, or errbacks with ConsistencyError if it detects a
346 # 0. Setup encoding parameters, encoder, and other such things.
347 # 1. Encrypt, encode, and publish segments.
348 assert IMutableUploadable.providedBy(newdata)
351 self.datalength = newdata.get_size()
352 #if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
353 # self._version = MDMF_VERSION
355 # self._version = SDMF_VERSION
357 self.log("starting publish, datalen is %s" % self.datalength)
358 self._status.set_size(self.datalength)
359 self._status.set_status("Started")
360 self._started = time.time()
362 self.done_deferred = defer.Deferred()
364 self._writekey = self._node.get_writekey()
365 assert self._writekey, "need write capability to publish"
367 # first, which servers will we publish to? We require that the
368 # servermap was updated in MODE_WRITE, so we can depend upon the
369 # peerlist computed by that process instead of computing our own.
371 assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
372 # we will push a version that is one larger than anything present
373 # in the grid, according to the servermap.
374 self._new_seqnum = self._servermap.highest_seqnum() + 1
376 # If we don't have a servermap, that's because we're doing the
379 self._servermap = ServerMap()
380 self._status.set_servermap(self._servermap)
382 self.log(format="new seqnum will be %(seqnum)d",
383 seqnum=self._new_seqnum, level=log.NOISY)
385 # having an up-to-date servermap (or using a filenode that was just
386 # created for the first time) also guarantees that the following
387 # fields are available
388 self.readkey = self._node.get_readkey()
389 self.required_shares = self._node.get_required_shares()
390 assert self.required_shares is not None
391 self.total_shares = self._node.get_total_shares()
392 assert self.total_shares is not None
393 self._status.set_encoding(self.required_shares, self.total_shares)
395 self._pubkey = self._node.get_pubkey()
397 self._privkey = self._node.get_privkey()
399 self._encprivkey = self._node.get_encprivkey()
401 sb = self._storage_broker
402 full_peerlist = [(s.get_serverid(), s.get_rref())
403 for s in sb.get_servers_for_psi(self._storage_index)]
404 self.full_peerlist = full_peerlist # for use later, immutable
405 self.bad_peers = set() # peerids who have errbacked/refused requests
407 # This will set self.segment_size, self.num_segments, and
409 self.setup_encoding_parameters()
411 # if we experience any surprises (writes which were rejected because
412 # our test vector did not match, or shares which we didn't expect to
413 # see), we set this flag and report an UncoordinatedWriteError at the
414 # end of the publish process.
415 self.surprised = False
417 # we keep track of three tables. The first is our goal: which share
418 # we want to see on which servers. This is initially populated by the
419 # existing servermap.
420 self.goal = set() # pairs of (peerid, shnum) tuples
422 # the second table is our list of outstanding queries: those which
423 # are in flight and may or may not be delivered, accepted, or
424 # acknowledged. Items are added to this table when the request is
425 # sent, and removed when the response returns (or errbacks).
426 self.outstanding = set() # (peerid, shnum) tuples
428 # the third is a table of successes: share which have actually been
429 # placed. These are populated when responses come back with success.
430 # When self.placed == self.goal, we're done.
431 self.placed = set() # (peerid, shnum) tuples
433 # we also keep a mapping from peerid to RemoteReference. Each time we
434 # pull a connection out of the full peerlist, we add it to this for
436 self.connections = {}
438 self.bad_share_checkstrings = {}
440 # This is set at the last step of the publishing process.
441 self.versioninfo = ""
443 # we use the servermap to populate the initial goal: this way we will
444 # try to update each existing share in place.
445 for (peerid, shnum) in self._servermap.servermap:
446 self.goal.add( (peerid, shnum) )
447 self.connections[peerid] = self._servermap.connections[peerid]
448 # then we add in all the shares that were bad (corrupted, bad
449 # signatures, etc). We want to replace these.
450 for key, old_checkstring in self._servermap.bad_shares.items():
451 (peerid, shnum) = key
453 self.bad_share_checkstrings[key] = old_checkstring
454 self.connections[peerid] = self._servermap.connections[peerid]
456 # TODO: Make this part do peer selection.
459 if self._version == MDMF_VERSION:
460 writer_class = MDMFSlotWriteProxy
462 writer_class = SDMFSlotWriteProxy
464 # For each (peerid, shnum) in self.goal, we make a
465 # write proxy for that peer. We'll use this to write
466 # shares to the peer.
467 for key in self.goal:
469 write_enabler = self._node.get_write_enabler(peerid)
470 renew_secret = self._node.get_renewal_secret(peerid)
471 cancel_secret = self._node.get_cancel_secret(peerid)
472 secrets = (write_enabler, renew_secret, cancel_secret)
474 self.writers[shnum] = writer_class(shnum,
475 self.connections[peerid],
479 self.required_shares,
483 self.writers[shnum].peerid = peerid
484 if (peerid, shnum) in self._servermap.servermap:
485 old_versionid, old_timestamp = self._servermap.servermap[key]
486 (old_seqnum, old_root_hash, old_salt, old_segsize,
487 old_datalength, old_k, old_N, old_prefix,
488 old_offsets_tuple) = old_versionid
489 self.writers[shnum].set_checkstring(old_seqnum,
492 elif (peerid, shnum) in self.bad_share_checkstrings:
493 old_checkstring = self.bad_share_checkstrings[(peerid, shnum)]
494 self.writers[shnum].set_checkstring(old_checkstring)
496 # Our remote shares will not have a complete checkstring until
497 # after we are done writing share data and have started to write
498 # blocks. In the meantime, we need to know what to look for when
499 # writing, so that we can detect UncoordinatedWriteErrors.
500 self._checkstring = self.writers.values()[0].get_checkstring()
502 # Now, we start pushing shares.
503 self._status.timings["setup"] = time.time() - self._started
504 # First, we encrypt, encode, and publish the shares that we need
505 # to encrypt, encode, and publish.
507 # This will eventually hold the block hash chain for each share
508 # that we publish. We define it this way so that empty publishes
509 # will still have something to write to the remote slot.
510 self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
511 for i in xrange(self.total_shares):
512 blocks = self.blockhashes[i]
513 for j in xrange(self.num_segments):
515 self.sharehash_leaves = None # eventually [sharehashes]
516 self.sharehashes = {} # shnum -> [sharehash leaves necessary to
517 # validate the share]
519 self.log("Starting push")
521 self._state = PUSHING_BLOCKS_STATE
524 return self.done_deferred
527 def _update_status(self):
528 self._status.set_status("Sending Shares: %d placed out of %d, "
529 "%d messages outstanding" %
532 len(self.outstanding)))
533 self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
536 def setup_encoding_parameters(self, offset=0):
537 if self._version == MDMF_VERSION:
538 segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
540 segment_size = self.datalength # SDMF is only one segment
541 # this must be a multiple of self.required_shares
542 segment_size = mathutil.next_multiple(segment_size,
543 self.required_shares)
544 self.segment_size = segment_size
546 # Calculate the starting segment for the upload.
548 # We use div_ceil instead of integer division here because
549 # it is semantically correct.
550 # If datalength isn't an even multiple of segment_size, but
551 # is larger than segment_size, datalength // segment_size
552 # will be the largest number such that num <= datalength and
553 # num % segment_size == 0. But that's not what we want,
554 # because it ignores the extra data. div_ceil will give us
555 # the right number of segments for the data that we're
557 self.num_segments = mathutil.div_ceil(self.datalength,
560 self.starting_segment = offset // segment_size
563 self.num_segments = 0
564 self.starting_segment = 0
567 self.log("building encoding parameters for file")
568 self.log("got segsize %d" % self.segment_size)
569 self.log("got %d segments" % self.num_segments)
571 if self._version == SDMF_VERSION:
572 assert self.num_segments in (0, 1) # SDMF
573 # calculate the tail segment size.
575 if segment_size and self.datalength:
576 self.tail_segment_size = self.datalength % segment_size
577 self.log("got tail segment size %d" % self.tail_segment_size)
579 self.tail_segment_size = 0
581 if self.tail_segment_size == 0 and segment_size:
582 # The tail segment is the same size as the other segments.
583 self.tail_segment_size = segment_size
586 fec = codec.CRSEncoder()
587 fec.set_params(self.segment_size,
588 self.required_shares, self.total_shares)
589 self.piece_size = fec.get_block_size()
592 if self.tail_segment_size == self.segment_size:
593 self.tail_fec = self.fec
595 tail_fec = codec.CRSEncoder()
596 tail_fec.set_params(self.tail_segment_size,
597 self.required_shares,
599 self.tail_fec = tail_fec
601 self._current_segment = self.starting_segment
602 self.end_segment = self.num_segments - 1
603 # Now figure out where the last segment should be.
604 if self.data.get_size() != self.datalength:
605 # We're updating a few segments in the middle of a mutable
606 # file, so we don't want to republish the whole thing.
607 # (we don't have enough data to do that even if we wanted
609 end = self.data.get_size()
610 self.end_segment = end // segment_size
611 if end % segment_size == 0:
612 self.end_segment -= 1
614 self.log("got start segment %d" % self.starting_segment)
615 self.log("got end segment %d" % self.end_segment)
618 def _push(self, ignored=None):
620 I manage state transitions. In particular, I see that we still
621 have a good enough number of writers to complete the upload
624 # Can we still successfully publish this file?
625 # TODO: Keep track of outstanding queries before aborting the
627 if len(self.writers) < self.required_shares or self.surprised:
628 return self._failure()
630 # Figure out what we need to do next. Each of these needs to
631 # return a deferred so that we don't block execution when this
632 # is first called in the upload method.
633 if self._state == PUSHING_BLOCKS_STATE:
634 return self.push_segment(self._current_segment)
636 elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
637 return self.push_everything_else()
639 # If we make it to this point, we were successful in placing the
644 def push_segment(self, segnum):
645 if self.num_segments == 0 and self._version == SDMF_VERSION:
646 self._add_dummy_salts()
648 if segnum > self.end_segment:
649 # We don't have any more segments to push.
650 self._state = PUSHING_EVERYTHING_ELSE_STATE
653 d = self._encode_segment(segnum)
654 d.addCallback(self._push_segment, segnum)
655 def _increment_segnum(ign):
656 self._current_segment += 1
657 # XXX: I don't think we need to do addBoth here -- any errBacks
658 # should be handled within push_segment.
659 d.addCallback(_increment_segnum)
660 d.addCallback(self._turn_barrier)
661 d.addCallback(self._push)
662 d.addErrback(self._failure)
665 def _turn_barrier(self, result):
667 I help the publish process avoid the recursion limit issues
670 return fireEventually(result)
673 def _add_dummy_salts(self):
675 SDMF files need a salt even if they're empty, or the signature
676 won't make sense. This method adds a dummy salt to each of our
677 SDMF writers so that they can write the signature later.
679 salt = os.urandom(16)
680 assert self._version == SDMF_VERSION
682 for writer in self.writers.itervalues():
683 writer.put_salt(salt)
686 def _encode_segment(self, segnum):
688 I encrypt and encode the segment segnum.
690 started = time.time()
692 if segnum + 1 == self.num_segments:
693 segsize = self.tail_segment_size
695 segsize = self.segment_size
698 self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
699 data = self.data.read(segsize)
700 # XXX: This is dumb. Why return a list?
703 assert len(data) == segsize, len(data)
705 salt = os.urandom(16)
707 key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
708 self._status.set_status("Encrypting")
710 crypttext = enc.process(data)
711 assert len(crypttext) == len(data)
714 self._status.timings["encrypt"] = now - started
718 if segnum + 1 == self.num_segments:
723 self._status.set_status("Encoding")
724 crypttext_pieces = [None] * self.required_shares
725 piece_size = fec.get_block_size()
726 for i in range(len(crypttext_pieces)):
727 offset = i * piece_size
728 piece = crypttext[offset:offset+piece_size]
729 piece = piece + "\x00"*(piece_size - len(piece)) # padding
730 crypttext_pieces[i] = piece
731 assert len(piece) == piece_size
732 d = fec.encode(crypttext_pieces)
733 def _done_encoding(res):
734 elapsed = time.time() - started
735 self._status.timings["encode"] = elapsed
737 d.addCallback(_done_encoding)
741 def _push_segment(self, encoded_and_salt, segnum):
743 I push (data, salt) as segment number segnum.
745 results, salt = encoded_and_salt
746 shares, shareids = results
747 self._status.set_status("Pushing segment")
748 for i in xrange(len(shares)):
749 sharedata = shares[i]
750 shareid = shareids[i]
751 if self._version == MDMF_VERSION:
752 hashed = salt + sharedata
755 block_hash = hashutil.block_hash(hashed)
756 self.blockhashes[shareid][segnum] = block_hash
757 # find the writer for this share
758 writer = self.writers[shareid]
759 writer.put_block(sharedata, segnum, salt)
762 def push_everything_else(self):
764 I put everything else associated with a share.
766 self._pack_started = time.time()
767 self.push_encprivkey()
768 self.push_blockhashes()
769 self.push_sharehashes()
770 self.push_toplevel_hashes_and_signature()
771 d = self.finish_publishing()
772 def _change_state(ignored):
773 self._state = DONE_STATE
774 d.addCallback(_change_state)
775 d.addCallback(self._push)
779 def push_encprivkey(self):
780 encprivkey = self._encprivkey
781 self._status.set_status("Pushing encrypted private key")
782 for writer in self.writers.itervalues():
783 writer.put_encprivkey(encprivkey)
786 def push_blockhashes(self):
787 self.sharehash_leaves = [None] * len(self.blockhashes)
788 self._status.set_status("Building and pushing block hash tree")
789 for shnum, blockhashes in self.blockhashes.iteritems():
790 t = hashtree.HashTree(blockhashes)
791 self.blockhashes[shnum] = list(t)
792 # set the leaf for future use.
793 self.sharehash_leaves[shnum] = t[0]
795 writer = self.writers[shnum]
796 writer.put_blockhashes(self.blockhashes[shnum])
799 def push_sharehashes(self):
800 self._status.set_status("Building and pushing share hash chain")
801 share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
802 for shnum in xrange(len(self.sharehash_leaves)):
803 needed_indices = share_hash_tree.needed_hashes(shnum)
804 self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
805 for i in needed_indices] )
806 writer = self.writers[shnum]
807 writer.put_sharehashes(self.sharehashes[shnum])
808 self.root_hash = share_hash_tree[0]
811 def push_toplevel_hashes_and_signature(self):
812 # We need to to three things here:
813 # - Push the root hash and salt hash
814 # - Get the checkstring of the resulting layout; sign that.
815 # - Push the signature
816 self._status.set_status("Pushing root hashes and signature")
817 for shnum in xrange(self.total_shares):
818 writer = self.writers[shnum]
819 writer.put_root_hash(self.root_hash)
820 self._update_checkstring()
821 self._make_and_place_signature()
824 def _update_checkstring(self):
826 After putting the root hash, MDMF files will have the
827 checkstring written to the storage server. This means that we
828 can update our copy of the checkstring so we can detect
829 uncoordinated writes. SDMF files will have the same checkstring,
830 so we need not do anything.
832 self._checkstring = self.writers.values()[0].get_checkstring()
835 def _make_and_place_signature(self):
837 I create and place the signature.
839 started = time.time()
840 self._status.set_status("Signing prefix")
841 signable = self.writers[0].get_signable()
842 self.signature = self._privkey.sign(signable)
844 for (shnum, writer) in self.writers.iteritems():
845 writer.put_signature(self.signature)
846 self._status.timings['sign'] = time.time() - started
849 def finish_publishing(self):
850 # We're almost done -- we just need to put the verification key
852 started = time.time()
853 self._status.set_status("Pushing shares")
854 self._started_pushing = started
856 verification_key = self._pubkey.serialize()
859 # TODO: Bad, since we remove from this same dict. We need to
860 # make a copy, or just use a non-iterated value.
861 for (shnum, writer) in self.writers.iteritems():
862 writer.put_verification_key(verification_key)
863 d = writer.finish_publishing()
864 # Add the (peerid, shnum) tuple to our list of outstanding
865 # queries. This gets used by _loop if some of our queries
866 # fail to place shares.
867 self.outstanding.add((writer.peerid, writer.shnum))
868 d.addCallback(self._got_write_answer, writer, started)
869 d.addErrback(self._connection_problem, writer)
871 self._record_verinfo()
872 self._status.timings['pack'] = time.time() - started
873 return defer.DeferredList(ds)
876 def _record_verinfo(self):
877 self.versioninfo = self.writers.values()[0].get_verinfo()
880 def _connection_problem(self, f, writer):
882 We ran into a connection problem while working with writer, and
883 need to deal with that.
885 self.log("found problem: %s" % str(f))
886 self._last_failure = f
887 del(self.writers[writer.shnum])
890 def log_goal(self, goal, message=""):
892 for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
893 logmsg.append("sh%d to [%s]" % (shnum,
894 idlib.shortnodeid_b2a(peerid)))
895 self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
896 self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
899 def update_goal(self):
900 # if log.recording_noisy
902 self.log_goal(self.goal, "before update: ")
904 # first, remove any bad peers from our goal
905 self.goal = set([ (peerid, shnum)
906 for (peerid, shnum) in self.goal
907 if peerid not in self.bad_peers ])
909 # find the homeless shares:
910 homefull_shares = set([shnum for (peerid, shnum) in self.goal])
911 homeless_shares = set(range(self.total_shares)) - homefull_shares
912 homeless_shares = sorted(list(homeless_shares))
913 # place them somewhere. We prefer unused servers at the beginning of
914 # the available peer list.
916 if not homeless_shares:
919 # if an old share X is on a node, put the new share X there too.
920 # TODO: 1: redistribute shares to achieve one-per-peer, by copying
921 # shares from existing peers to new (less-crowded) ones. The
922 # old shares must still be updated.
923 # TODO: 2: move those shares instead of copying them, to reduce future
926 # this is a bit CPU intensive but easy to analyze. We create a sort
927 # order for each peerid. If the peerid is marked as bad, we don't
928 # even put them in the list. Then we care about the number of shares
929 # which have already been assigned to them. After that we care about
930 # their permutation order.
931 old_assignments = DictOfSets()
932 for (peerid, shnum) in self.goal:
933 old_assignments.add(peerid, shnum)
936 for i, (peerid, ss) in enumerate(self.full_peerlist):
937 if peerid in self.bad_peers:
939 entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
940 peerlist.append(entry)
944 raise NotEnoughServersError("Ran out of non-bad servers, "
946 str(self._first_write_error),
947 self._first_write_error)
949 # we then index this peerlist with an integer, because we may have to
950 # wrap. We update the goal as we go.
952 for shnum in homeless_shares:
953 (ignored1, ignored2, peerid, ss) = peerlist[i]
954 # if we are forced to send a share to a server that already has
955 # one, we may have two write requests in flight, and the
956 # servermap (which was computed before either request was sent)
957 # won't reflect the new shares, so the second response will be
958 # surprising. There is code in _got_write_answer() to tolerate
959 # this, otherwise it would cause the publish to fail with an
960 # UncoordinatedWriteError. See #546 for details of the trouble
961 # this used to cause.
962 self.goal.add( (peerid, shnum) )
963 self.connections[peerid] = ss
965 if i >= len(peerlist):
968 self.log_goal(self.goal, "after update: ")
971 def _got_write_answer(self, answer, writer, started):
973 # SDMF writers only pretend to write when readers set their
974 # blocks, salts, and so on -- they actually just write once,
975 # at the end of the upload process. In fake writes, they
976 # return defer.succeed(None). If we see that, we shouldn't
977 # bother checking it.
980 peerid = writer.peerid
981 lp = self.log("_got_write_answer from %s, share %d" %
982 (idlib.shortnodeid_b2a(peerid), writer.shnum))
985 elapsed = now - started
987 self._status.add_per_server_time(peerid, elapsed)
989 wrote, read_data = answer
991 surprise_shares = set(read_data.keys()) - set([writer.shnum])
993 # We need to remove from surprise_shares any shares that we are
994 # knowingly also writing to that peer from other writers.
996 # TODO: Precompute this.
997 known_shnums = [x.shnum for x in self.writers.values()
998 if x.peerid == peerid]
999 surprise_shares -= set(known_shnums)
1000 self.log("found the following surprise shares: %s" %
1001 str(surprise_shares))
1003 # Now surprise shares contains all of the shares that we did not
1004 # expect to be there.
1007 for shnum in surprise_shares:
1008 # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
1009 checkstring = read_data[shnum][0]
1010 # What we want to do here is to see if their (seqnum,
1011 # roothash, salt) is the same as our (seqnum, roothash,
1012 # salt), or the equivalent for MDMF. The best way to do this
1013 # is to store a packed representation of our checkstring
1014 # somewhere, then not bother unpacking the other
1016 if checkstring == self._checkstring:
1017 # they have the right share, somehow
1019 if (peerid,shnum) in self.goal:
1020 # and we want them to have it, so we probably sent them a
1021 # copy in an earlier write. This is ok, and avoids the
1025 # They aren't in our goal, but they are still for the right
1026 # version. Somebody else wrote them, and it's a convergent
1027 # uncoordinated write. Pretend this is ok (don't be
1028 # surprised), since I suspect there's a decent chance that
1029 # we'll hit this in normal operation.
1033 # the new shares are of a different version
1034 if peerid in self._servermap.reachable_peers:
1035 # we asked them about their shares, so we had knowledge
1036 # of what they used to have. Any surprising shares must
1037 # have come from someone else, so UCW.
1040 # we didn't ask them, and now we've discovered that they
1041 # have a share we didn't know about. This indicates that
1042 # mapupdate should have wokred harder and asked more
1043 # servers before concluding that it knew about them all.
1045 # signal UCW, but make sure to ask this peer next time,
1046 # so we'll remember to update it if/when we retry.
1048 # TODO: ask this peer next time. I don't yet have a good
1049 # way to do this. Two insufficient possibilities are:
1051 # self._servermap.add_new_share(peerid, shnum, verinfo, now)
1052 # but that requires fetching/validating/parsing the whole
1053 # version string, and all we have is the checkstring
1054 # self._servermap.mark_bad_share(peerid, shnum, checkstring)
1055 # that will make publish overwrite the share next time,
1056 # but it won't re-query the server, and it won't make
1057 # mapupdate search further
1059 # TODO later: when publish starts, do
1060 # servermap.get_best_version(), extract the seqnum,
1061 # subtract one, and store as highest-replaceable-seqnum.
1062 # Then, if this surprise-because-we-didn't-ask share is
1063 # of highest-replaceable-seqnum or lower, we're allowed
1064 # to replace it: send out a new writev (or rather add it
1065 # to self.goal and loop).
1071 self.log("they had shares %s that we didn't know about" %
1072 (list(surprise_shares),),
1073 parent=lp, level=log.WEIRD, umid="un9CSQ")
1074 self.surprised = True
1077 # TODO: there are two possibilities. The first is that the server
1078 # is full (or just doesn't want to give us any room), which means
1079 # we shouldn't ask them again, but is *not* an indication of an
1080 # uncoordinated write. The second is that our testv failed, which
1081 # *does* indicate an uncoordinated write. We currently don't have
1082 # a way to tell these two apart (in fact, the storage server code
1083 # doesn't have the option of refusing our share).
1085 # If the server is full, mark the peer as bad (so we don't ask
1086 # them again), but don't set self.surprised. The loop() will find
1089 # If the testv failed, log it, set self.surprised, but don't
1090 # bother adding to self.bad_peers .
1092 self.log("our testv failed, so the write did not happen",
1093 parent=lp, level=log.WEIRD, umid="8sc26g")
1094 self.surprised = True
1095 self.bad_peers.add(writer) # don't ask them again
1096 # use the checkstring to add information to the log message
1097 for (shnum,readv) in read_data.items():
1098 checkstring = readv[0]
1101 other_salt) = unpack_checkstring(checkstring)
1102 expected_version = self._servermap.version_on_peer(peerid,
1104 if expected_version:
1105 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1106 offsets_tuple) = expected_version
1107 self.log("somebody modified the share on us:"
1108 " shnum=%d: I thought they had #%d:R=%s,"
1109 " but testv reported #%d:R=%s" %
1111 seqnum, base32.b2a(root_hash)[:4],
1112 other_seqnum, base32.b2a(other_roothash)[:4]),
1113 parent=lp, level=log.NOISY)
1114 # if expected_version==None, then we didn't expect to see a
1115 # share on that peer, and the 'surprise_shares' clause above
1116 # will have logged it.
1119 # and update the servermap
1120 # self.versioninfo is set during the last phase of publishing.
1121 # If we get there, we know that responses correspond to placed
1122 # shares, and can safely execute these statements.
1123 if self.versioninfo:
1124 self.log("wrote successfully: adding new share to servermap")
1125 self._servermap.add_new_share(peerid, writer.shnum,
1126 self.versioninfo, started)
1127 self.placed.add( (peerid, writer.shnum) )
1128 self._update_status()
1129 # the next method in the deferred chain will check to see if
1130 # we're done and successful.
1135 if not self._running:
1137 self._running = False
1139 self._status.timings["total"] = now - self._started
1141 elapsed = now - self._started_pushing
1142 self._status.timings['push'] = elapsed
1144 self._status.set_active(False)
1145 self.log("Publish done, success")
1146 self._status.set_status("Finished")
1147 self._status.set_progress(1.0)
1148 # Get k and segsize, then give them to the caller.
1150 hints['segsize'] = self.segment_size
1151 hints['k'] = self.required_shares
1152 self._node.set_downloader_hints(hints)
1153 eventually(self.done_deferred.callback, None)
1155 def _failure(self, f=None):
1157 self._last_failure = f
1159 if not self.surprised:
1160 # We ran out of servers
1161 msg = "Publish ran out of good servers"
1162 if self._last_failure:
1163 msg += ", last failure was: %s" % str(self._last_failure)
1165 e = NotEnoughServersError(msg)
1168 # We ran into shares that we didn't recognize, which means
1169 # that we need to return an UncoordinatedWriteError.
1170 self.log("Publish failed with UncoordinatedWriteError")
1171 e = UncoordinatedWriteError()
1172 f = failure.Failure(e)
1173 eventually(self.done_deferred.callback, f)
1176 class MutableFileHandle:
1178 I am a mutable uploadable built around a filehandle-like object,
1179 usually either a StringIO instance or a handle to an actual file.
1181 implements(IMutableUploadable)
1183 def __init__(self, filehandle):
1184 # The filehandle is defined as a generally file-like object that
1185 # has these two methods. We don't care beyond that.
1186 assert hasattr(filehandle, "read")
1187 assert hasattr(filehandle, "close")
1189 self._filehandle = filehandle
1190 # We must start reading at the beginning of the file, or we risk
1191 # encountering errors when the data read does not match the size
1192 # reported to the uploader.
1193 self._filehandle.seek(0)
1195 # We have not yet read anything, so our position is 0.
1201 I return the amount of data in my filehandle.
1203 if not hasattr(self, "_size"):
1204 old_position = self._filehandle.tell()
1205 # Seek to the end of the file by seeking 0 bytes from the
1207 self._filehandle.seek(0, 2) # 2 == os.SEEK_END in 2.5+
1208 self._size = self._filehandle.tell()
1209 # Restore the previous position, in case this was called
1211 self._filehandle.seek(old_position)
1212 assert self._filehandle.tell() == old_position
1214 assert hasattr(self, "_size")
1220 I return the position of my read marker -- i.e., how much data I
1221 have already read and returned to callers.
1226 def read(self, length):
1228 I return some data (up to length bytes) from my filehandle.
1230 In most cases, I return length bytes, but sometimes I won't --
1231 for example, if I am asked to read beyond the end of a file, or
1234 results = self._filehandle.read(length)
1235 self._marker += len(results)
1241 I close the underlying filehandle. Any further operations on the
1242 filehandle fail at this point.
1244 self._filehandle.close()
1247 class MutableData(MutableFileHandle):
1249 I am a mutable uploadable built around a string, which I then cast
1250 into a StringIO and treat as a filehandle.
1253 def __init__(self, s):
1254 # Take a string and return a file-like uploadable.
1255 assert isinstance(s, str)
1257 MutableFileHandle.__init__(self, StringIO(s))
1260 class TransformingUploadable:
1262 I am an IMutableUploadable that wraps another IMutableUploadable,
1263 and some segments that are already on the grid. When I am called to
1264 read, I handle merging of boundary segments.
1266 implements(IMutableUploadable)
1269 def __init__(self, data, offset, segment_size, start, end):
1270 assert IMutableUploadable.providedBy(data)
1272 self._newdata = data
1273 self._offset = offset
1274 self._segment_size = segment_size
1278 self._read_marker = 0
1280 self._first_segment_offset = offset % segment_size
1282 num = self.log("TransformingUploadable: starting", parent=None)
1283 self._log_number = num
1284 self.log("got fso: %d" % self._first_segment_offset)
1285 self.log("got offset: %d" % self._offset)
1288 def log(self, *args, **kwargs):
1289 if 'parent' not in kwargs:
1290 kwargs['parent'] = self._log_number
1291 if "facility" not in kwargs:
1292 kwargs["facility"] = "tahoe.mutable.transforminguploadable"
1293 return log.msg(*args, **kwargs)
1297 return self._offset + self._newdata.get_size()
1300 def read(self, length):
1301 # We can get data from 3 sources here.
1302 # 1. The first of the segments provided to us.
1303 # 2. The data that we're replacing things with.
1304 # 3. The last of the segments provided to us.
1306 # are we in state 0?
1307 self.log("reading %d bytes" % length)
1310 old_data_length = self._first_segment_offset - self._read_marker
1311 if old_data_length > 0:
1312 if old_data_length > length:
1313 old_data_length = length
1314 self.log("returning %d bytes of old start data" % old_data_length)
1316 old_data_end = old_data_length + self._read_marker
1317 old_start_data = self._start[self._read_marker:old_data_end]
1318 length -= old_data_length
1320 # otherwise calculations later get screwed up.
1323 # Is there enough new data to satisfy this read? If not, we need
1324 # to pad the end of the data with data from our last segment.
1325 old_end_length = length - \
1326 (self._newdata.get_size() - self._newdata.pos())
1328 if old_end_length > 0:
1329 self.log("reading %d bytes of old end data" % old_end_length)
1331 # TODO: We're not explicitly checking for tail segment size
1332 # here. Is that a problem?
1333 old_data_offset = (length - old_end_length + \
1334 old_data_length) % self._segment_size
1335 self.log("reading at offset %d" % old_data_offset)
1336 old_end = old_data_offset + old_end_length
1337 old_end_data = self._end[old_data_offset:old_end]
1338 length -= old_end_length
1339 assert length == self._newdata.get_size() - self._newdata.pos()
1341 self.log("reading %d bytes of new data" % length)
1342 new_data = self._newdata.read(length)
1343 new_data = "".join(new_data)
1345 self._read_marker += len(old_start_data + new_data + old_end_data)
1347 return old_start_data + new_data + old_end_data