4 from StringIO import StringIO
5 from itertools import count
6 from zope.interface import implements
7 from twisted.internet import defer
8 from twisted.python import failure
9 from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
11 from allmydata.util import base32, hashutil, mathutil, idlib, log
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from foolscap.api import eventually, fireEventually
18 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
19 UncoordinatedWriteError, NotEnoughServersError
20 from allmydata.mutable.servermap import ServerMap
21 from allmydata.mutable.layout import unpack_checkstring, MDMFSlotWriteProxy, \
25 DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
26 PUSHING_BLOCKS_STATE = 0
27 PUSHING_EVERYTHING_ELSE_STATE = 1
31 implements(IPublishStatus)
32 statusid_counter = count(0)
35 self.timings["send_per_server"] = {}
36 self.timings["encrypt"] = 0.0
37 self.timings["encode"] = 0.0
41 self.storage_index = None
43 self.encoding = ("?", "?")
45 self.status = "Not started"
47 self.counter = self.statusid_counter.next()
48 self.started = time.time()
50 def add_per_server_time(self, peerid, elapsed):
51 if peerid not in self.timings["send_per_server"]:
52 self.timings["send_per_server"][peerid] = []
53 self.timings["send_per_server"][peerid].append(elapsed)
54 def accumulate_encode_time(self, elapsed):
55 self.timings["encode"] += elapsed
56 def accumulate_encrypt_time(self, elapsed):
57 self.timings["encrypt"] += elapsed
59 def get_started(self):
61 def get_storage_index(self):
62 return self.storage_index
63 def get_encoding(self):
65 def using_helper(self):
67 def get_servermap(self):
73 def get_progress(self):
77 def get_counter(self):
80 def set_storage_index(self, si):
81 self.storage_index = si
82 def set_helper(self, helper):
84 def set_servermap(self, servermap):
85 self.servermap = servermap
86 def set_encoding(self, k, n):
87 self.encoding = (k, n)
88 def set_size(self, size):
90 def set_status(self, status):
92 def set_progress(self, value):
94 def set_active(self, value):
97 class LoopLimitExceededError(Exception):
101 """I represent a single act of publishing the mutable file to the grid. I
102 will only publish my data if the servermap I am using still represents
103 the current state of the world.
105 To make the initial publish, set servermap to None.
108 def __init__(self, filenode, storage_broker, servermap):
109 self._node = filenode
110 self._storage_broker = storage_broker
111 self._servermap = servermap
112 self._storage_index = self._node.get_storage_index()
113 self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
114 num = self.log("Publish(%s): starting" % prefix, parent=None)
115 self._log_number = num
117 self._first_write_error = None
118 self._last_failure = None
120 self._status = PublishStatus()
121 self._status.set_storage_index(self._storage_index)
122 self._status.set_helper(False)
123 self._status.set_progress(0.0)
124 self._status.set_active(True)
125 self._version = self._node.get_version()
126 assert self._version in (SDMF_VERSION, MDMF_VERSION)
129 def get_status(self):
132 def log(self, *args, **kwargs):
133 if 'parent' not in kwargs:
134 kwargs['parent'] = self._log_number
135 if "facility" not in kwargs:
136 kwargs["facility"] = "tahoe.mutable.publish"
137 return log.msg(*args, **kwargs)
140 def update(self, data, offset, blockhashes, version):
142 I replace the contents of this file with the contents of data,
143 starting at offset. I return a Deferred that fires with None
144 when the replacement has been completed, or with an error if
145 something went wrong during the process.
147 Note that this process will not upload new shares. If the file
148 being updated is in need of repair, callers will have to repair
152 # 1: Make peer assignments. We'll assign each share that we know
153 # about on the grid to that peer that currently holds that
154 # share, and will not place any new shares.
155 # 2: Setup encoding parameters. Most of these will stay the same
156 # -- datalength will change, as will some of the offsets.
157 # 3. Upload the new segments.
159 assert IMutableUploadable.providedBy(data)
163 # XXX: Use the MutableFileVersion instead.
164 self.datalength = self._node.get_size()
165 if data.get_size() > self.datalength:
166 self.datalength = data.get_size()
168 self.log("starting update")
169 self.log("adding new data of length %d at offset %d" % \
170 (data.get_size(), offset))
171 self.log("new data length is %d" % self.datalength)
172 self._status.set_size(self.datalength)
173 self._status.set_status("Started")
174 self._started = time.time()
176 self.done_deferred = defer.Deferred()
178 self._writekey = self._node.get_writekey()
179 assert self._writekey, "need write capability to publish"
181 # first, which servers will we publish to? We require that the
182 # servermap was updated in MODE_WRITE, so we can depend upon the
183 # peerlist computed by that process instead of computing our own.
184 assert self._servermap
185 assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
186 # we will push a version that is one larger than anything present
187 # in the grid, according to the servermap.
188 self._new_seqnum = self._servermap.highest_seqnum() + 1
189 self._status.set_servermap(self._servermap)
191 self.log(format="new seqnum will be %(seqnum)d",
192 seqnum=self._new_seqnum, level=log.NOISY)
194 # We're updating an existing file, so all of the following
195 # should be available.
196 self.readkey = self._node.get_readkey()
197 self.required_shares = self._node.get_required_shares()
198 assert self.required_shares is not None
199 self.total_shares = self._node.get_total_shares()
200 assert self.total_shares is not None
201 self._status.set_encoding(self.required_shares, self.total_shares)
203 self._pubkey = self._node.get_pubkey()
205 self._privkey = self._node.get_privkey()
207 self._encprivkey = self._node.get_encprivkey()
209 sb = self._storage_broker
210 full_peerlist = [(s.get_serverid(), s.get_rref())
211 for s in sb.get_servers_for_psi(self._storage_index)]
212 self.full_peerlist = full_peerlist # for use later, immutable
213 self.bad_peers = set() # peerids who have errbacked/refused requests
215 # This will set self.segment_size, self.num_segments, and
216 # self.fec. TODO: Does it know how to do the offset? Probably
217 # not. So do that part next.
218 self.setup_encoding_parameters(offset=offset)
220 # if we experience any surprises (writes which were rejected because
221 # our test vector did not match, or shares which we didn't expect to
222 # see), we set this flag and report an UncoordinatedWriteError at the
223 # end of the publish process.
224 self.surprised = False
226 # we keep track of three tables. The first is our goal: which share
227 # we want to see on which servers. This is initially populated by the
228 # existing servermap.
229 self.goal = set() # pairs of (peerid, shnum) tuples
231 # the second table is our list of outstanding queries: those which
232 # are in flight and may or may not be delivered, accepted, or
233 # acknowledged. Items are added to this table when the request is
234 # sent, and removed when the response returns (or errbacks).
235 self.outstanding = set() # (peerid, shnum) tuples
237 # the third is a table of successes: share which have actually been
238 # placed. These are populated when responses come back with success.
239 # When self.placed == self.goal, we're done.
240 self.placed = set() # (peerid, shnum) tuples
242 # we also keep a mapping from peerid to RemoteReference. Each time we
243 # pull a connection out of the full peerlist, we add it to this for
245 self.connections = {}
247 self.bad_share_checkstrings = {}
249 # This is set at the last step of the publishing process.
250 self.versioninfo = ""
252 # we use the servermap to populate the initial goal: this way we will
253 # try to update each existing share in place. Since we're
254 # updating, we ignore damaged and missing shares -- callers must
255 # do a repair to repair and recreate these.
256 for (peerid, shnum) in self._servermap.servermap:
257 self.goal.add( (peerid, shnum) )
258 self.connections[peerid] = self._servermap.connections[peerid]
261 # SDMF files are updated differently.
262 self._version = MDMF_VERSION
263 writer_class = MDMFSlotWriteProxy
265 # For each (peerid, shnum) in self.goal, we make a
266 # write proxy for that peer. We'll use this to write
267 # shares to the peer.
268 for key in self.goal:
270 write_enabler = self._node.get_write_enabler(peerid)
271 renew_secret = self._node.get_renewal_secret(peerid)
272 cancel_secret = self._node.get_cancel_secret(peerid)
273 secrets = (write_enabler, renew_secret, cancel_secret)
275 self.writers[shnum] = writer_class(shnum,
276 self.connections[peerid],
280 self.required_shares,
284 self.writers[shnum].peerid = peerid
285 assert (peerid, shnum) in self._servermap.servermap
286 old_versionid, old_timestamp = self._servermap.servermap[key]
287 (old_seqnum, old_root_hash, old_salt, old_segsize,
288 old_datalength, old_k, old_N, old_prefix,
289 old_offsets_tuple) = old_versionid
290 self.writers[shnum].set_checkstring(old_seqnum,
294 # Our remote shares will not have a complete checkstring until
295 # after we are done writing share data and have started to write
296 # blocks. In the meantime, we need to know what to look for when
297 # writing, so that we can detect UncoordinatedWriteErrors.
298 self._checkstring = self.writers.values()[0].get_checkstring()
300 # Now, we start pushing shares.
301 self._status.timings["setup"] = time.time() - self._started
302 # First, we encrypt, encode, and publish the shares that we need
303 # to encrypt, encode, and publish.
305 # Our update process fetched these for us. We need to update
306 # them in place as publishing happens.
307 self.blockhashes = {} # (shnum, [blochashes])
308 for (i, bht) in blockhashes.iteritems():
309 # We need to extract the leaves from our old hash tree.
310 old_segcount = mathutil.div_ceil(version[4],
312 h = hashtree.IncompleteHashTree(old_segcount)
313 bht = dict(enumerate(bht))
315 leaves = h[h.get_leaf_index(0):]
316 for j in xrange(self.num_segments - len(leaves)):
319 assert len(leaves) >= self.num_segments
320 self.blockhashes[i] = leaves
321 # This list will now be the leaves that were set during the
322 # initial upload + enough empty hashes to make it a
323 # power-of-two. If we exceed a power of two boundary, we
324 # should be encoding the file over again, and should not be
326 #assert len(self.blockhashes[i]) == \
327 # hashtree.roundup_pow2(self.num_segments), \
328 # len(self.blockhashes[i])
329 # XXX: Except this doesn't work. Figure out why.
331 # These are filled in later, after we've modified the block hash
333 self.sharehash_leaves = None # eventually [sharehashes]
334 self.sharehashes = {} # shnum -> [sharehash leaves necessary to
335 # validate the share]
337 self.log("Starting push")
339 self._state = PUSHING_BLOCKS_STATE
342 return self.done_deferred
345 def publish(self, newdata):
346 """Publish the filenode's current contents. Returns a Deferred that
347 fires (with None) when the publish has done as much work as it's ever
348 going to do, or errbacks with ConsistencyError if it detects a
352 # 0. Setup encoding parameters, encoder, and other such things.
353 # 1. Encrypt, encode, and publish segments.
354 assert IMutableUploadable.providedBy(newdata)
357 self.datalength = newdata.get_size()
358 #if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
359 # self._version = MDMF_VERSION
361 # self._version = SDMF_VERSION
363 self.log("starting publish, datalen is %s" % self.datalength)
364 self._status.set_size(self.datalength)
365 self._status.set_status("Started")
366 self._started = time.time()
368 self.done_deferred = defer.Deferred()
370 self._writekey = self._node.get_writekey()
371 assert self._writekey, "need write capability to publish"
373 # first, which servers will we publish to? We require that the
374 # servermap was updated in MODE_WRITE, so we can depend upon the
375 # peerlist computed by that process instead of computing our own.
377 assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
378 # we will push a version that is one larger than anything present
379 # in the grid, according to the servermap.
380 self._new_seqnum = self._servermap.highest_seqnum() + 1
382 # If we don't have a servermap, that's because we're doing the
385 self._servermap = ServerMap()
386 self._status.set_servermap(self._servermap)
388 self.log(format="new seqnum will be %(seqnum)d",
389 seqnum=self._new_seqnum, level=log.NOISY)
391 # having an up-to-date servermap (or using a filenode that was just
392 # created for the first time) also guarantees that the following
393 # fields are available
394 self.readkey = self._node.get_readkey()
395 self.required_shares = self._node.get_required_shares()
396 assert self.required_shares is not None
397 self.total_shares = self._node.get_total_shares()
398 assert self.total_shares is not None
399 self._status.set_encoding(self.required_shares, self.total_shares)
401 self._pubkey = self._node.get_pubkey()
403 self._privkey = self._node.get_privkey()
405 self._encprivkey = self._node.get_encprivkey()
407 sb = self._storage_broker
408 full_peerlist = [(s.get_serverid(), s.get_rref())
409 for s in sb.get_servers_for_psi(self._storage_index)]
410 self.full_peerlist = full_peerlist # for use later, immutable
411 self.bad_peers = set() # peerids who have errbacked/refused requests
413 # This will set self.segment_size, self.num_segments, and
415 self.setup_encoding_parameters()
417 # if we experience any surprises (writes which were rejected because
418 # our test vector did not match, or shares which we didn't expect to
419 # see), we set this flag and report an UncoordinatedWriteError at the
420 # end of the publish process.
421 self.surprised = False
423 # we keep track of three tables. The first is our goal: which share
424 # we want to see on which servers. This is initially populated by the
425 # existing servermap.
426 self.goal = set() # pairs of (peerid, shnum) tuples
428 # the second table is our list of outstanding queries: those which
429 # are in flight and may or may not be delivered, accepted, or
430 # acknowledged. Items are added to this table when the request is
431 # sent, and removed when the response returns (or errbacks).
432 self.outstanding = set() # (peerid, shnum) tuples
434 # the third is a table of successes: share which have actually been
435 # placed. These are populated when responses come back with success.
436 # When self.placed == self.goal, we're done.
437 self.placed = set() # (peerid, shnum) tuples
439 # we also keep a mapping from peerid to RemoteReference. Each time we
440 # pull a connection out of the full peerlist, we add it to this for
442 self.connections = {}
444 self.bad_share_checkstrings = {}
446 # This is set at the last step of the publishing process.
447 self.versioninfo = ""
449 # we use the servermap to populate the initial goal: this way we will
450 # try to update each existing share in place.
451 for (peerid, shnum) in self._servermap.servermap:
452 self.goal.add( (peerid, shnum) )
453 self.connections[peerid] = self._servermap.connections[peerid]
454 # then we add in all the shares that were bad (corrupted, bad
455 # signatures, etc). We want to replace these.
456 for key, old_checkstring in self._servermap.bad_shares.items():
457 (peerid, shnum) = key
459 self.bad_share_checkstrings[key] = old_checkstring
460 self.connections[peerid] = self._servermap.connections[peerid]
462 # TODO: Make this part do peer selection.
465 if self._version == MDMF_VERSION:
466 writer_class = MDMFSlotWriteProxy
468 writer_class = SDMFSlotWriteProxy
470 # For each (peerid, shnum) in self.goal, we make a
471 # write proxy for that peer. We'll use this to write
472 # shares to the peer.
473 for key in self.goal:
475 write_enabler = self._node.get_write_enabler(peerid)
476 renew_secret = self._node.get_renewal_secret(peerid)
477 cancel_secret = self._node.get_cancel_secret(peerid)
478 secrets = (write_enabler, renew_secret, cancel_secret)
480 self.writers[shnum] = writer_class(shnum,
481 self.connections[peerid],
485 self.required_shares,
489 self.writers[shnum].peerid = peerid
490 if (peerid, shnum) in self._servermap.servermap:
491 old_versionid, old_timestamp = self._servermap.servermap[key]
492 (old_seqnum, old_root_hash, old_salt, old_segsize,
493 old_datalength, old_k, old_N, old_prefix,
494 old_offsets_tuple) = old_versionid
495 self.writers[shnum].set_checkstring(old_seqnum,
498 elif (peerid, shnum) in self.bad_share_checkstrings:
499 old_checkstring = self.bad_share_checkstrings[(peerid, shnum)]
500 self.writers[shnum].set_checkstring(old_checkstring)
502 # Our remote shares will not have a complete checkstring until
503 # after we are done writing share data and have started to write
504 # blocks. In the meantime, we need to know what to look for when
505 # writing, so that we can detect UncoordinatedWriteErrors.
506 self._checkstring = self.writers.values()[0].get_checkstring()
508 # Now, we start pushing shares.
509 self._status.timings["setup"] = time.time() - self._started
510 # First, we encrypt, encode, and publish the shares that we need
511 # to encrypt, encode, and publish.
513 # This will eventually hold the block hash chain for each share
514 # that we publish. We define it this way so that empty publishes
515 # will still have something to write to the remote slot.
516 self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
517 for i in xrange(self.total_shares):
518 blocks = self.blockhashes[i]
519 for j in xrange(self.num_segments):
521 self.sharehash_leaves = None # eventually [sharehashes]
522 self.sharehashes = {} # shnum -> [sharehash leaves necessary to
523 # validate the share]
525 self.log("Starting push")
527 self._state = PUSHING_BLOCKS_STATE
530 return self.done_deferred
533 def _update_status(self):
534 self._status.set_status("Sending Shares: %d placed out of %d, "
535 "%d messages outstanding" %
538 len(self.outstanding)))
539 self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
542 def setup_encoding_parameters(self, offset=0):
543 if self._version == MDMF_VERSION:
544 segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
546 segment_size = self.datalength # SDMF is only one segment
547 # this must be a multiple of self.required_shares
548 segment_size = mathutil.next_multiple(segment_size,
549 self.required_shares)
550 self.segment_size = segment_size
552 # Calculate the starting segment for the upload.
554 # We use div_ceil instead of integer division here because
555 # it is semantically correct.
556 # If datalength isn't an even multiple of segment_size, but
557 # is larger than segment_size, datalength // segment_size
558 # will be the largest number such that num <= datalength and
559 # num % segment_size == 0. But that's not what we want,
560 # because it ignores the extra data. div_ceil will give us
561 # the right number of segments for the data that we're
563 self.num_segments = mathutil.div_ceil(self.datalength,
566 self.starting_segment = offset // segment_size
569 self.num_segments = 0
570 self.starting_segment = 0
573 self.log("building encoding parameters for file")
574 self.log("got segsize %d" % self.segment_size)
575 self.log("got %d segments" % self.num_segments)
577 if self._version == SDMF_VERSION:
578 assert self.num_segments in (0, 1) # SDMF
579 # calculate the tail segment size.
581 if segment_size and self.datalength:
582 self.tail_segment_size = self.datalength % segment_size
583 self.log("got tail segment size %d" % self.tail_segment_size)
585 self.tail_segment_size = 0
587 if self.tail_segment_size == 0 and segment_size:
588 # The tail segment is the same size as the other segments.
589 self.tail_segment_size = segment_size
592 fec = codec.CRSEncoder()
593 fec.set_params(self.segment_size,
594 self.required_shares, self.total_shares)
595 self.piece_size = fec.get_block_size()
598 if self.tail_segment_size == self.segment_size:
599 self.tail_fec = self.fec
601 tail_fec = codec.CRSEncoder()
602 tail_fec.set_params(self.tail_segment_size,
603 self.required_shares,
605 self.tail_fec = tail_fec
607 self._current_segment = self.starting_segment
608 self.end_segment = self.num_segments - 1
609 # Now figure out where the last segment should be.
610 if self.data.get_size() != self.datalength:
611 # We're updating a few segments in the middle of a mutable
612 # file, so we don't want to republish the whole thing.
613 # (we don't have enough data to do that even if we wanted
615 end = self.data.get_size()
616 self.end_segment = end // segment_size
617 if end % segment_size == 0:
618 self.end_segment -= 1
620 self.log("got start segment %d" % self.starting_segment)
621 self.log("got end segment %d" % self.end_segment)
624 def _push(self, ignored=None):
626 I manage state transitions. In particular, I see that we still
627 have a good enough number of writers to complete the upload
630 # Can we still successfully publish this file?
631 # TODO: Keep track of outstanding queries before aborting the
633 if len(self.writers) < self.required_shares or self.surprised:
634 return self._failure()
636 # Figure out what we need to do next. Each of these needs to
637 # return a deferred so that we don't block execution when this
638 # is first called in the upload method.
639 if self._state == PUSHING_BLOCKS_STATE:
640 return self.push_segment(self._current_segment)
642 elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
643 return self.push_everything_else()
645 # If we make it to this point, we were successful in placing the
650 def push_segment(self, segnum):
651 if self.num_segments == 0 and self._version == SDMF_VERSION:
652 self._add_dummy_salts()
654 if segnum > self.end_segment:
655 # We don't have any more segments to push.
656 self._state = PUSHING_EVERYTHING_ELSE_STATE
659 d = self._encode_segment(segnum)
660 d.addCallback(self._push_segment, segnum)
661 def _increment_segnum(ign):
662 self._current_segment += 1
663 # XXX: I don't think we need to do addBoth here -- any errBacks
664 # should be handled within push_segment.
665 d.addCallback(_increment_segnum)
666 d.addCallback(self._turn_barrier)
667 d.addCallback(self._push)
668 d.addErrback(self._failure)
671 def _turn_barrier(self, result):
673 I help the publish process avoid the recursion limit issues
676 return fireEventually(result)
679 def _add_dummy_salts(self):
681 SDMF files need a salt even if they're empty, or the signature
682 won't make sense. This method adds a dummy salt to each of our
683 SDMF writers so that they can write the signature later.
685 salt = os.urandom(16)
686 assert self._version == SDMF_VERSION
688 for writer in self.writers.itervalues():
689 writer.put_salt(salt)
692 def _encode_segment(self, segnum):
694 I encrypt and encode the segment segnum.
696 started = time.time()
698 if segnum + 1 == self.num_segments:
699 segsize = self.tail_segment_size
701 segsize = self.segment_size
704 self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
705 data = self.data.read(segsize)
706 # XXX: This is dumb. Why return a list?
709 assert len(data) == segsize, len(data)
711 salt = os.urandom(16)
713 key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
714 self._status.set_status("Encrypting")
716 crypttext = enc.process(data)
717 assert len(crypttext) == len(data)
720 self._status.accumulate_encrypt_time(now - started)
724 if segnum + 1 == self.num_segments:
729 self._status.set_status("Encoding")
730 crypttext_pieces = [None] * self.required_shares
731 piece_size = fec.get_block_size()
732 for i in range(len(crypttext_pieces)):
733 offset = i * piece_size
734 piece = crypttext[offset:offset+piece_size]
735 piece = piece + "\x00"*(piece_size - len(piece)) # padding
736 crypttext_pieces[i] = piece
737 assert len(piece) == piece_size
738 d = fec.encode(crypttext_pieces)
739 def _done_encoding(res):
740 elapsed = time.time() - started
741 self._status.accumulate_encode_time(elapsed)
743 d.addCallback(_done_encoding)
747 def _push_segment(self, encoded_and_salt, segnum):
749 I push (data, salt) as segment number segnum.
751 results, salt = encoded_and_salt
752 shares, shareids = results
753 self._status.set_status("Pushing segment")
754 for i in xrange(len(shares)):
755 sharedata = shares[i]
756 shareid = shareids[i]
757 if self._version == MDMF_VERSION:
758 hashed = salt + sharedata
761 block_hash = hashutil.block_hash(hashed)
762 self.blockhashes[shareid][segnum] = block_hash
763 # find the writer for this share
764 writer = self.writers[shareid]
765 writer.put_block(sharedata, segnum, salt)
768 def push_everything_else(self):
770 I put everything else associated with a share.
772 self._pack_started = time.time()
773 self.push_encprivkey()
774 self.push_blockhashes()
775 self.push_sharehashes()
776 self.push_toplevel_hashes_and_signature()
777 d = self.finish_publishing()
778 def _change_state(ignored):
779 self._state = DONE_STATE
780 d.addCallback(_change_state)
781 d.addCallback(self._push)
785 def push_encprivkey(self):
786 encprivkey = self._encprivkey
787 self._status.set_status("Pushing encrypted private key")
788 for writer in self.writers.itervalues():
789 writer.put_encprivkey(encprivkey)
792 def push_blockhashes(self):
793 self.sharehash_leaves = [None] * len(self.blockhashes)
794 self._status.set_status("Building and pushing block hash tree")
795 for shnum, blockhashes in self.blockhashes.iteritems():
796 t = hashtree.HashTree(blockhashes)
797 self.blockhashes[shnum] = list(t)
798 # set the leaf for future use.
799 self.sharehash_leaves[shnum] = t[0]
801 writer = self.writers[shnum]
802 writer.put_blockhashes(self.blockhashes[shnum])
805 def push_sharehashes(self):
806 self._status.set_status("Building and pushing share hash chain")
807 share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
808 for shnum in xrange(len(self.sharehash_leaves)):
809 needed_indices = share_hash_tree.needed_hashes(shnum)
810 self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
811 for i in needed_indices] )
812 writer = self.writers[shnum]
813 writer.put_sharehashes(self.sharehashes[shnum])
814 self.root_hash = share_hash_tree[0]
817 def push_toplevel_hashes_and_signature(self):
818 # We need to to three things here:
819 # - Push the root hash and salt hash
820 # - Get the checkstring of the resulting layout; sign that.
821 # - Push the signature
822 self._status.set_status("Pushing root hashes and signature")
823 for shnum in xrange(self.total_shares):
824 writer = self.writers[shnum]
825 writer.put_root_hash(self.root_hash)
826 self._update_checkstring()
827 self._make_and_place_signature()
830 def _update_checkstring(self):
832 After putting the root hash, MDMF files will have the
833 checkstring written to the storage server. This means that we
834 can update our copy of the checkstring so we can detect
835 uncoordinated writes. SDMF files will have the same checkstring,
836 so we need not do anything.
838 self._checkstring = self.writers.values()[0].get_checkstring()
841 def _make_and_place_signature(self):
843 I create and place the signature.
845 started = time.time()
846 self._status.set_status("Signing prefix")
847 signable = self.writers[0].get_signable()
848 self.signature = self._privkey.sign(signable)
850 for (shnum, writer) in self.writers.iteritems():
851 writer.put_signature(self.signature)
852 self._status.timings['sign'] = time.time() - started
855 def finish_publishing(self):
856 # We're almost done -- we just need to put the verification key
858 started = time.time()
859 self._status.set_status("Pushing shares")
860 self._started_pushing = started
862 verification_key = self._pubkey.serialize()
865 # TODO: Bad, since we remove from this same dict. We need to
866 # make a copy, or just use a non-iterated value.
867 for (shnum, writer) in self.writers.iteritems():
868 writer.put_verification_key(verification_key)
869 d = writer.finish_publishing()
870 d.addErrback(self._connection_problem, writer)
871 # Add the (peerid, shnum) tuple to our list of outstanding
872 # queries. This gets used by _loop if some of our queries
873 # fail to place shares.
874 self.outstanding.add((writer.peerid, writer.shnum))
875 d.addCallback(self._got_write_answer, writer, started)
877 self._record_verinfo()
878 self._status.timings['pack'] = time.time() - started
879 return defer.DeferredList(ds)
882 def _record_verinfo(self):
883 self.versioninfo = self.writers.values()[0].get_verinfo()
886 def _connection_problem(self, f, writer):
888 We ran into a connection problem while working with writer, and
889 need to deal with that.
891 self.log("found problem: %s" % str(f))
892 self._last_failure = f
893 del(self.writers[writer.shnum])
896 def log_goal(self, goal, message=""):
898 for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
899 logmsg.append("sh%d to [%s]" % (shnum,
900 idlib.shortnodeid_b2a(peerid)))
901 self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
902 self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
905 def update_goal(self):
906 # if log.recording_noisy
908 self.log_goal(self.goal, "before update: ")
910 # first, remove any bad peers from our goal
911 self.goal = set([ (peerid, shnum)
912 for (peerid, shnum) in self.goal
913 if peerid not in self.bad_peers ])
915 # find the homeless shares:
916 homefull_shares = set([shnum for (peerid, shnum) in self.goal])
917 homeless_shares = set(range(self.total_shares)) - homefull_shares
918 homeless_shares = sorted(list(homeless_shares))
919 # place them somewhere. We prefer unused servers at the beginning of
920 # the available peer list.
922 if not homeless_shares:
925 # if an old share X is on a node, put the new share X there too.
926 # TODO: 1: redistribute shares to achieve one-per-peer, by copying
927 # shares from existing peers to new (less-crowded) ones. The
928 # old shares must still be updated.
929 # TODO: 2: move those shares instead of copying them, to reduce future
932 # this is a bit CPU intensive but easy to analyze. We create a sort
933 # order for each peerid. If the peerid is marked as bad, we don't
934 # even put them in the list. Then we care about the number of shares
935 # which have already been assigned to them. After that we care about
936 # their permutation order.
937 old_assignments = DictOfSets()
938 for (peerid, shnum) in self.goal:
939 old_assignments.add(peerid, shnum)
942 for i, (peerid, ss) in enumerate(self.full_peerlist):
943 if peerid in self.bad_peers:
945 entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
946 peerlist.append(entry)
950 raise NotEnoughServersError("Ran out of non-bad servers, "
952 str(self._first_write_error),
953 self._first_write_error)
955 # we then index this peerlist with an integer, because we may have to
956 # wrap. We update the goal as we go.
958 for shnum in homeless_shares:
959 (ignored1, ignored2, peerid, ss) = peerlist[i]
960 # if we are forced to send a share to a server that already has
961 # one, we may have two write requests in flight, and the
962 # servermap (which was computed before either request was sent)
963 # won't reflect the new shares, so the second response will be
964 # surprising. There is code in _got_write_answer() to tolerate
965 # this, otherwise it would cause the publish to fail with an
966 # UncoordinatedWriteError. See #546 for details of the trouble
967 # this used to cause.
968 self.goal.add( (peerid, shnum) )
969 self.connections[peerid] = ss
971 if i >= len(peerlist):
974 self.log_goal(self.goal, "after update: ")
977 def _got_write_answer(self, answer, writer, started):
979 # SDMF writers only pretend to write when readers set their
980 # blocks, salts, and so on -- they actually just write once,
981 # at the end of the upload process. In fake writes, they
982 # return defer.succeed(None). If we see that, we shouldn't
983 # bother checking it.
986 peerid = writer.peerid
987 lp = self.log("_got_write_answer from %s, share %d" %
988 (idlib.shortnodeid_b2a(peerid), writer.shnum))
991 elapsed = now - started
993 self._status.add_per_server_time(peerid, elapsed)
995 wrote, read_data = answer
997 surprise_shares = set(read_data.keys()) - set([writer.shnum])
999 # We need to remove from surprise_shares any shares that we are
1000 # knowingly also writing to that peer from other writers.
1002 # TODO: Precompute this.
1003 known_shnums = [x.shnum for x in self.writers.values()
1004 if x.peerid == peerid]
1005 surprise_shares -= set(known_shnums)
1006 self.log("found the following surprise shares: %s" %
1007 str(surprise_shares))
1009 # Now surprise shares contains all of the shares that we did not
1010 # expect to be there.
1013 for shnum in surprise_shares:
1014 # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
1015 checkstring = read_data[shnum][0]
1016 # What we want to do here is to see if their (seqnum,
1017 # roothash, salt) is the same as our (seqnum, roothash,
1018 # salt), or the equivalent for MDMF. The best way to do this
1019 # is to store a packed representation of our checkstring
1020 # somewhere, then not bother unpacking the other
1022 if checkstring == self._checkstring:
1023 # they have the right share, somehow
1025 if (peerid,shnum) in self.goal:
1026 # and we want them to have it, so we probably sent them a
1027 # copy in an earlier write. This is ok, and avoids the
1031 # They aren't in our goal, but they are still for the right
1032 # version. Somebody else wrote them, and it's a convergent
1033 # uncoordinated write. Pretend this is ok (don't be
1034 # surprised), since I suspect there's a decent chance that
1035 # we'll hit this in normal operation.
1039 # the new shares are of a different version
1040 if peerid in self._servermap.reachable_peers:
1041 # we asked them about their shares, so we had knowledge
1042 # of what they used to have. Any surprising shares must
1043 # have come from someone else, so UCW.
1046 # we didn't ask them, and now we've discovered that they
1047 # have a share we didn't know about. This indicates that
1048 # mapupdate should have wokred harder and asked more
1049 # servers before concluding that it knew about them all.
1051 # signal UCW, but make sure to ask this peer next time,
1052 # so we'll remember to update it if/when we retry.
1054 # TODO: ask this peer next time. I don't yet have a good
1055 # way to do this. Two insufficient possibilities are:
1057 # self._servermap.add_new_share(peerid, shnum, verinfo, now)
1058 # but that requires fetching/validating/parsing the whole
1059 # version string, and all we have is the checkstring
1060 # self._servermap.mark_bad_share(peerid, shnum, checkstring)
1061 # that will make publish overwrite the share next time,
1062 # but it won't re-query the server, and it won't make
1063 # mapupdate search further
1065 # TODO later: when publish starts, do
1066 # servermap.get_best_version(), extract the seqnum,
1067 # subtract one, and store as highest-replaceable-seqnum.
1068 # Then, if this surprise-because-we-didn't-ask share is
1069 # of highest-replaceable-seqnum or lower, we're allowed
1070 # to replace it: send out a new writev (or rather add it
1071 # to self.goal and loop).
1077 self.log("they had shares %s that we didn't know about" %
1078 (list(surprise_shares),),
1079 parent=lp, level=log.WEIRD, umid="un9CSQ")
1080 self.surprised = True
1083 # TODO: there are two possibilities. The first is that the server
1084 # is full (or just doesn't want to give us any room), which means
1085 # we shouldn't ask them again, but is *not* an indication of an
1086 # uncoordinated write. The second is that our testv failed, which
1087 # *does* indicate an uncoordinated write. We currently don't have
1088 # a way to tell these two apart (in fact, the storage server code
1089 # doesn't have the option of refusing our share).
1091 # If the server is full, mark the peer as bad (so we don't ask
1092 # them again), but don't set self.surprised. The loop() will find
1095 # If the testv failed, log it, set self.surprised, but don't
1096 # bother adding to self.bad_peers .
1098 self.log("our testv failed, so the write did not happen",
1099 parent=lp, level=log.WEIRD, umid="8sc26g")
1100 self.surprised = True
1101 self.bad_peers.add(writer) # don't ask them again
1102 # use the checkstring to add information to the log message
1103 for (shnum,readv) in read_data.items():
1104 checkstring = readv[0]
1107 other_salt) = unpack_checkstring(checkstring)
1108 expected_version = self._servermap.version_on_peer(peerid,
1110 if expected_version:
1111 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1112 offsets_tuple) = expected_version
1113 self.log("somebody modified the share on us:"
1114 " shnum=%d: I thought they had #%d:R=%s,"
1115 " but testv reported #%d:R=%s" %
1117 seqnum, base32.b2a(root_hash)[:4],
1118 other_seqnum, base32.b2a(other_roothash)[:4]),
1119 parent=lp, level=log.NOISY)
1120 # if expected_version==None, then we didn't expect to see a
1121 # share on that peer, and the 'surprise_shares' clause above
1122 # will have logged it.
1125 # and update the servermap
1126 # self.versioninfo is set during the last phase of publishing.
1127 # If we get there, we know that responses correspond to placed
1128 # shares, and can safely execute these statements.
1129 if self.versioninfo:
1130 self.log("wrote successfully: adding new share to servermap")
1131 self._servermap.add_new_share(peerid, writer.shnum,
1132 self.versioninfo, started)
1133 self.placed.add( (peerid, writer.shnum) )
1134 self._update_status()
1135 # the next method in the deferred chain will check to see if
1136 # we're done and successful.
1141 if not self._running:
1143 self._running = False
1145 self._status.timings["total"] = now - self._started
1147 elapsed = now - self._started_pushing
1148 self._status.timings['push'] = elapsed
1150 self._status.set_active(False)
1151 self.log("Publish done, success")
1152 self._status.set_status("Finished")
1153 self._status.set_progress(1.0)
1154 # Get k and segsize, then give them to the caller.
1156 hints['segsize'] = self.segment_size
1157 hints['k'] = self.required_shares
1158 self._node.set_downloader_hints(hints)
1159 eventually(self.done_deferred.callback, None)
1161 def _failure(self, f=None):
1163 self._last_failure = f
1165 if not self.surprised:
1166 # We ran out of servers
1167 msg = "Publish ran out of good servers"
1168 if self._last_failure:
1169 msg += ", last failure was: %s" % str(self._last_failure)
1171 e = NotEnoughServersError(msg)
1174 # We ran into shares that we didn't recognize, which means
1175 # that we need to return an UncoordinatedWriteError.
1176 self.log("Publish failed with UncoordinatedWriteError")
1177 e = UncoordinatedWriteError()
1178 f = failure.Failure(e)
1179 eventually(self.done_deferred.callback, f)
1182 class MutableFileHandle:
1184 I am a mutable uploadable built around a filehandle-like object,
1185 usually either a StringIO instance or a handle to an actual file.
1187 implements(IMutableUploadable)
1189 def __init__(self, filehandle):
1190 # The filehandle is defined as a generally file-like object that
1191 # has these two methods. We don't care beyond that.
1192 assert hasattr(filehandle, "read")
1193 assert hasattr(filehandle, "close")
1195 self._filehandle = filehandle
1196 # We must start reading at the beginning of the file, or we risk
1197 # encountering errors when the data read does not match the size
1198 # reported to the uploader.
1199 self._filehandle.seek(0)
1201 # We have not yet read anything, so our position is 0.
1207 I return the amount of data in my filehandle.
1209 if not hasattr(self, "_size"):
1210 old_position = self._filehandle.tell()
1211 # Seek to the end of the file by seeking 0 bytes from the
1213 self._filehandle.seek(0, 2) # 2 == os.SEEK_END in 2.5+
1214 self._size = self._filehandle.tell()
1215 # Restore the previous position, in case this was called
1217 self._filehandle.seek(old_position)
1218 assert self._filehandle.tell() == old_position
1220 assert hasattr(self, "_size")
1226 I return the position of my read marker -- i.e., how much data I
1227 have already read and returned to callers.
1232 def read(self, length):
1234 I return some data (up to length bytes) from my filehandle.
1236 In most cases, I return length bytes, but sometimes I won't --
1237 for example, if I am asked to read beyond the end of a file, or
1240 results = self._filehandle.read(length)
1241 self._marker += len(results)
1247 I close the underlying filehandle. Any further operations on the
1248 filehandle fail at this point.
1250 self._filehandle.close()
1253 class MutableData(MutableFileHandle):
1255 I am a mutable uploadable built around a string, which I then cast
1256 into a StringIO and treat as a filehandle.
1259 def __init__(self, s):
1260 # Take a string and return a file-like uploadable.
1261 assert isinstance(s, str)
1263 MutableFileHandle.__init__(self, StringIO(s))
1266 class TransformingUploadable:
1268 I am an IMutableUploadable that wraps another IMutableUploadable,
1269 and some segments that are already on the grid. When I am called to
1270 read, I handle merging of boundary segments.
1272 implements(IMutableUploadable)
1275 def __init__(self, data, offset, segment_size, start, end):
1276 assert IMutableUploadable.providedBy(data)
1278 self._newdata = data
1279 self._offset = offset
1280 self._segment_size = segment_size
1284 self._read_marker = 0
1286 self._first_segment_offset = offset % segment_size
1288 num = self.log("TransformingUploadable: starting", parent=None)
1289 self._log_number = num
1290 self.log("got fso: %d" % self._first_segment_offset)
1291 self.log("got offset: %d" % self._offset)
1294 def log(self, *args, **kwargs):
1295 if 'parent' not in kwargs:
1296 kwargs['parent'] = self._log_number
1297 if "facility" not in kwargs:
1298 kwargs["facility"] = "tahoe.mutable.transforminguploadable"
1299 return log.msg(*args, **kwargs)
1303 return self._offset + self._newdata.get_size()
1306 def read(self, length):
1307 # We can get data from 3 sources here.
1308 # 1. The first of the segments provided to us.
1309 # 2. The data that we're replacing things with.
1310 # 3. The last of the segments provided to us.
1312 # are we in state 0?
1313 self.log("reading %d bytes" % length)
1316 old_data_length = self._first_segment_offset - self._read_marker
1317 if old_data_length > 0:
1318 if old_data_length > length:
1319 old_data_length = length
1320 self.log("returning %d bytes of old start data" % old_data_length)
1322 old_data_end = old_data_length + self._read_marker
1323 old_start_data = self._start[self._read_marker:old_data_end]
1324 length -= old_data_length
1326 # otherwise calculations later get screwed up.
1329 # Is there enough new data to satisfy this read? If not, we need
1330 # to pad the end of the data with data from our last segment.
1331 old_end_length = length - \
1332 (self._newdata.get_size() - self._newdata.pos())
1334 if old_end_length > 0:
1335 self.log("reading %d bytes of old end data" % old_end_length)
1337 # TODO: We're not explicitly checking for tail segment size
1338 # here. Is that a problem?
1339 old_data_offset = (length - old_end_length + \
1340 old_data_length) % self._segment_size
1341 self.log("reading at offset %d" % old_data_offset)
1342 old_end = old_data_offset + old_end_length
1343 old_end_data = self._end[old_data_offset:old_end]
1344 length -= old_end_length
1345 assert length == self._newdata.get_size() - self._newdata.pos()
1347 self.log("reading %d bytes of new data" % length)
1348 new_data = self._newdata.read(length)
1349 new_data = "".join(new_data)
1351 self._read_marker += len(old_start_data + new_data + old_end_data)
1353 return old_start_data + new_data + old_end_data