4 from StringIO import StringIO
5 from itertools import count
6 from zope.interface import implements
7 from twisted.internet import defer
8 from twisted.python import failure
9 from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
11 from allmydata.util import base32, hashutil, mathutil, idlib, log
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from foolscap.api import eventually, fireEventually
18 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
19 UncoordinatedWriteError, NotEnoughServersError
20 from allmydata.mutable.servermap import ServerMap
21 from allmydata.mutable.layout import get_version_from_checkstring,\
22 unpack_mdmf_checkstring, \
23 unpack_sdmf_checkstring, \
25 SDMFSlotWriteProxy, MDMFCHECKSTRING
28 DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
29 PUSHING_BLOCKS_STATE = 0
30 PUSHING_EVERYTHING_ELSE_STATE = 1
34 implements(IPublishStatus)
35 statusid_counter = count(0)
38 self.timings["send_per_server"] = {}
39 self.timings["encrypt"] = 0.0
40 self.timings["encode"] = 0.0
44 self.storage_index = None
46 self.encoding = ("?", "?")
48 self.status = "Not started"
50 self.counter = self.statusid_counter.next()
51 self.started = time.time()
53 def add_per_server_time(self, peerid, elapsed):
54 if peerid not in self.timings["send_per_server"]:
55 self.timings["send_per_server"][peerid] = []
56 self.timings["send_per_server"][peerid].append(elapsed)
57 def accumulate_encode_time(self, elapsed):
58 self.timings["encode"] += elapsed
59 def accumulate_encrypt_time(self, elapsed):
60 self.timings["encrypt"] += elapsed
62 def get_started(self):
64 def get_storage_index(self):
65 return self.storage_index
66 def get_encoding(self):
68 def using_helper(self):
70 def get_servermap(self):
76 def get_progress(self):
80 def get_counter(self):
83 def set_storage_index(self, si):
84 self.storage_index = si
85 def set_helper(self, helper):
87 def set_servermap(self, servermap):
88 self.servermap = servermap
89 def set_encoding(self, k, n):
90 self.encoding = (k, n)
91 def set_size(self, size):
93 def set_status(self, status):
95 def set_progress(self, value):
97 def set_active(self, value):
100 class LoopLimitExceededError(Exception):
104 """I represent a single act of publishing the mutable file to the grid. I
105 will only publish my data if the servermap I am using still represents
106 the current state of the world.
108 To make the initial publish, set servermap to None.
111 def __init__(self, filenode, storage_broker, servermap):
112 self._node = filenode
113 self._storage_broker = storage_broker
114 self._servermap = servermap
115 self._storage_index = self._node.get_storage_index()
116 self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
117 num = self.log("Publish(%s): starting" % prefix, parent=None)
118 self._log_number = num
120 self._first_write_error = None
121 self._last_failure = None
123 self._status = PublishStatus()
124 self._status.set_storage_index(self._storage_index)
125 self._status.set_helper(False)
126 self._status.set_progress(0.0)
127 self._status.set_active(True)
128 self._version = self._node.get_version()
129 assert self._version in (SDMF_VERSION, MDMF_VERSION)
132 def get_status(self):
135 def log(self, *args, **kwargs):
136 if 'parent' not in kwargs:
137 kwargs['parent'] = self._log_number
138 if "facility" not in kwargs:
139 kwargs["facility"] = "tahoe.mutable.publish"
140 return log.msg(*args, **kwargs)
143 def update(self, data, offset, blockhashes, version):
145 I replace the contents of this file with the contents of data,
146 starting at offset. I return a Deferred that fires with None
147 when the replacement has been completed, or with an error if
148 something went wrong during the process.
150 Note that this process will not upload new shares. If the file
151 being updated is in need of repair, callers will have to repair
155 # 1: Make peer assignments. We'll assign each share that we know
156 # about on the grid to that peer that currently holds that
157 # share, and will not place any new shares.
158 # 2: Setup encoding parameters. Most of these will stay the same
159 # -- datalength will change, as will some of the offsets.
160 # 3. Upload the new segments.
162 assert IMutableUploadable.providedBy(data)
166 # XXX: Use the MutableFileVersion instead.
167 self.datalength = self._node.get_size()
168 if data.get_size() > self.datalength:
169 self.datalength = data.get_size()
171 self.log("starting update")
172 self.log("adding new data of length %d at offset %d" % \
173 (data.get_size(), offset))
174 self.log("new data length is %d" % self.datalength)
175 self._status.set_size(self.datalength)
176 self._status.set_status("Started")
177 self._started = time.time()
179 self.done_deferred = defer.Deferred()
181 self._writekey = self._node.get_writekey()
182 assert self._writekey, "need write capability to publish"
184 # first, which servers will we publish to? We require that the
185 # servermap was updated in MODE_WRITE, so we can depend upon the
186 # peerlist computed by that process instead of computing our own.
187 assert self._servermap
188 assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
189 # we will push a version that is one larger than anything present
190 # in the grid, according to the servermap.
191 self._new_seqnum = self._servermap.highest_seqnum() + 1
192 self._status.set_servermap(self._servermap)
194 self.log(format="new seqnum will be %(seqnum)d",
195 seqnum=self._new_seqnum, level=log.NOISY)
197 # We're updating an existing file, so all of the following
198 # should be available.
199 self.readkey = self._node.get_readkey()
200 self.required_shares = self._node.get_required_shares()
201 assert self.required_shares is not None
202 self.total_shares = self._node.get_total_shares()
203 assert self.total_shares is not None
204 self._status.set_encoding(self.required_shares, self.total_shares)
206 self._pubkey = self._node.get_pubkey()
208 self._privkey = self._node.get_privkey()
210 self._encprivkey = self._node.get_encprivkey()
212 sb = self._storage_broker
213 full_peerlist = [(s.get_serverid(), s.get_rref())
214 for s in sb.get_servers_for_psi(self._storage_index)]
215 self.full_peerlist = full_peerlist # for use later, immutable
216 self.bad_peers = set() # peerids who have errbacked/refused requests
218 # This will set self.segment_size, self.num_segments, and
219 # self.fec. TODO: Does it know how to do the offset? Probably
220 # not. So do that part next.
221 self.setup_encoding_parameters(offset=offset)
223 # if we experience any surprises (writes which were rejected because
224 # our test vector did not match, or shares which we didn't expect to
225 # see), we set this flag and report an UncoordinatedWriteError at the
226 # end of the publish process.
227 self.surprised = False
229 # we keep track of three tables. The first is our goal: which share
230 # we want to see on which servers. This is initially populated by the
231 # existing servermap.
232 self.goal = set() # pairs of (peerid, shnum) tuples
234 # the number of outstanding queries: those that are in flight and
235 # may or may not be delivered, accepted, or acknowledged. This is
236 # incremented when a query is sent, and decremented when the response
237 # returns or errbacks.
238 self.num_outstanding = 0
240 # the third is a table of successes: share which have actually been
241 # placed. These are populated when responses come back with success.
242 # When self.placed == self.goal, we're done.
243 self.placed = set() # (peerid, shnum) tuples
245 # we also keep a mapping from peerid to RemoteReference. Each time we
246 # pull a connection out of the full peerlist, we add it to this for
248 self.connections = {}
250 self.bad_share_checkstrings = {}
252 # This is set at the last step of the publishing process.
253 self.versioninfo = ""
255 # we use the servermap to populate the initial goal: this way we will
256 # try to update each existing share in place. Since we're
257 # updating, we ignore damaged and missing shares -- callers must
258 # do a repair to repair and recreate these.
259 for (peerid, shnum) in self._servermap.servermap:
260 self.goal.add( (peerid, shnum) )
261 self.connections[peerid] = self._servermap.connections[peerid]
264 # SDMF files are updated differently.
265 self._version = MDMF_VERSION
266 writer_class = MDMFSlotWriteProxy
268 # For each (peerid, shnum) in self.goal, we make a
269 # write proxy for that peer. We'll use this to write
270 # shares to the peer.
271 for key in self.goal:
273 write_enabler = self._node.get_write_enabler(peerid)
274 renew_secret = self._node.get_renewal_secret(peerid)
275 cancel_secret = self._node.get_cancel_secret(peerid)
276 secrets = (write_enabler, renew_secret, cancel_secret)
278 self.writers[shnum] = writer_class(shnum,
279 self.connections[peerid],
283 self.required_shares,
287 self.writers[shnum].peerid = peerid
288 assert (peerid, shnum) in self._servermap.servermap
289 old_versionid, old_timestamp = self._servermap.servermap[key]
290 (old_seqnum, old_root_hash, old_salt, old_segsize,
291 old_datalength, old_k, old_N, old_prefix,
292 old_offsets_tuple) = old_versionid
293 self.writers[shnum].set_checkstring(old_seqnum,
297 # Our remote shares will not have a complete checkstring until
298 # after we are done writing share data and have started to write
299 # blocks. In the meantime, we need to know what to look for when
300 # writing, so that we can detect UncoordinatedWriteErrors.
301 self._checkstring = self.writers.values()[0].get_checkstring()
303 # Now, we start pushing shares.
304 self._status.timings["setup"] = time.time() - self._started
305 # First, we encrypt, encode, and publish the shares that we need
306 # to encrypt, encode, and publish.
308 # Our update process fetched these for us. We need to update
309 # them in place as publishing happens.
310 self.blockhashes = {} # (shnum, [blochashes])
311 for (i, bht) in blockhashes.iteritems():
312 # We need to extract the leaves from our old hash tree.
313 old_segcount = mathutil.div_ceil(version[4],
315 h = hashtree.IncompleteHashTree(old_segcount)
316 bht = dict(enumerate(bht))
318 leaves = h[h.get_leaf_index(0):]
319 for j in xrange(self.num_segments - len(leaves)):
322 assert len(leaves) >= self.num_segments
323 self.blockhashes[i] = leaves
324 # This list will now be the leaves that were set during the
325 # initial upload + enough empty hashes to make it a
326 # power-of-two. If we exceed a power of two boundary, we
327 # should be encoding the file over again, and should not be
329 #assert len(self.blockhashes[i]) == \
330 # hashtree.roundup_pow2(self.num_segments), \
331 # len(self.blockhashes[i])
332 # XXX: Except this doesn't work. Figure out why.
334 # These are filled in later, after we've modified the block hash
336 self.sharehash_leaves = None # eventually [sharehashes]
337 self.sharehashes = {} # shnum -> [sharehash leaves necessary to
338 # validate the share]
340 self.log("Starting push")
342 self._state = PUSHING_BLOCKS_STATE
345 return self.done_deferred
348 def publish(self, newdata):
349 """Publish the filenode's current contents. Returns a Deferred that
350 fires (with None) when the publish has done as much work as it's ever
351 going to do, or errbacks with ConsistencyError if it detects a
355 # 0. Setup encoding parameters, encoder, and other such things.
356 # 1. Encrypt, encode, and publish segments.
357 assert IMutableUploadable.providedBy(newdata)
360 self.datalength = newdata.get_size()
361 #if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
362 # self._version = MDMF_VERSION
364 # self._version = SDMF_VERSION
366 self.log("starting publish, datalen is %s" % self.datalength)
367 self._status.set_size(self.datalength)
368 self._status.set_status("Started")
369 self._started = time.time()
371 self.done_deferred = defer.Deferred()
373 self._writekey = self._node.get_writekey()
374 assert self._writekey, "need write capability to publish"
376 # first, which servers will we publish to? We require that the
377 # servermap was updated in MODE_WRITE, so we can depend upon the
378 # peerlist computed by that process instead of computing our own.
380 assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
381 # we will push a version that is one larger than anything present
382 # in the grid, according to the servermap.
383 self._new_seqnum = self._servermap.highest_seqnum() + 1
385 # If we don't have a servermap, that's because we're doing the
388 self._servermap = ServerMap()
389 self._status.set_servermap(self._servermap)
391 self.log(format="new seqnum will be %(seqnum)d",
392 seqnum=self._new_seqnum, level=log.NOISY)
394 # having an up-to-date servermap (or using a filenode that was just
395 # created for the first time) also guarantees that the following
396 # fields are available
397 self.readkey = self._node.get_readkey()
398 self.required_shares = self._node.get_required_shares()
399 assert self.required_shares is not None
400 self.total_shares = self._node.get_total_shares()
401 assert self.total_shares is not None
402 self._status.set_encoding(self.required_shares, self.total_shares)
404 self._pubkey = self._node.get_pubkey()
406 self._privkey = self._node.get_privkey()
408 self._encprivkey = self._node.get_encprivkey()
410 sb = self._storage_broker
411 full_peerlist = [(s.get_serverid(), s.get_rref())
412 for s in sb.get_servers_for_psi(self._storage_index)]
413 self.full_peerlist = full_peerlist # for use later, immutable
414 self.bad_peers = set() # peerids who have errbacked/refused requests
416 # This will set self.segment_size, self.num_segments, and
418 self.setup_encoding_parameters()
420 # if we experience any surprises (writes which were rejected because
421 # our test vector did not match, or shares which we didn't expect to
422 # see), we set this flag and report an UncoordinatedWriteError at the
423 # end of the publish process.
424 self.surprised = False
426 # we keep track of three tables. The first is our goal: which share
427 # we want to see on which servers. This is initially populated by the
428 # existing servermap.
429 self.goal = set() # pairs of (peerid, shnum) tuples
431 # the number of outstanding queries: those that are in flight and
432 # may or may not be delivered, accepted, or acknowledged. This is
433 # incremented when a query is sent, and decremented when the response
434 # returns or errbacks.
435 self.num_outstanding = 0
437 # the third is a table of successes: share which have actually been
438 # placed. These are populated when responses come back with success.
439 # When self.placed == self.goal, we're done.
440 self.placed = set() # (peerid, shnum) tuples
442 # we also keep a mapping from peerid to RemoteReference. Each time we
443 # pull a connection out of the full peerlist, we add it to this for
445 self.connections = {}
447 self.bad_share_checkstrings = {}
449 # This is set at the last step of the publishing process.
450 self.versioninfo = ""
452 # we use the servermap to populate the initial goal: this way we will
453 # try to update each existing share in place.
454 for (peerid, shnum) in self._servermap.servermap:
455 self.goal.add( (peerid, shnum) )
456 self.connections[peerid] = self._servermap.connections[peerid]
457 # then we add in all the shares that were bad (corrupted, bad
458 # signatures, etc). We want to replace these.
459 for key, old_checkstring in self._servermap.bad_shares.items():
460 (peerid, shnum) = key
462 self.bad_share_checkstrings[key] = old_checkstring
463 self.connections[peerid] = self._servermap.connections[peerid]
465 # TODO: Make this part do peer selection.
468 if self._version == MDMF_VERSION:
469 writer_class = MDMFSlotWriteProxy
471 writer_class = SDMFSlotWriteProxy
473 # For each (peerid, shnum) in self.goal, we make a
474 # write proxy for that peer. We'll use this to write
475 # shares to the peer.
476 for key in self.goal:
478 write_enabler = self._node.get_write_enabler(peerid)
479 renew_secret = self._node.get_renewal_secret(peerid)
480 cancel_secret = self._node.get_cancel_secret(peerid)
481 secrets = (write_enabler, renew_secret, cancel_secret)
483 self.writers[shnum] = writer_class(shnum,
484 self.connections[peerid],
488 self.required_shares,
492 self.writers[shnum].peerid = peerid
493 if (peerid, shnum) in self._servermap.servermap:
494 old_versionid, old_timestamp = self._servermap.servermap[key]
495 (old_seqnum, old_root_hash, old_salt, old_segsize,
496 old_datalength, old_k, old_N, old_prefix,
497 old_offsets_tuple) = old_versionid
498 self.writers[shnum].set_checkstring(old_seqnum,
501 elif (peerid, shnum) in self.bad_share_checkstrings:
502 old_checkstring = self.bad_share_checkstrings[(peerid, shnum)]
503 self.writers[shnum].set_checkstring(old_checkstring)
505 # Our remote shares will not have a complete checkstring until
506 # after we are done writing share data and have started to write
507 # blocks. In the meantime, we need to know what to look for when
508 # writing, so that we can detect UncoordinatedWriteErrors.
509 self._checkstring = self.writers.values()[0].get_checkstring()
511 # Now, we start pushing shares.
512 self._status.timings["setup"] = time.time() - self._started
513 # First, we encrypt, encode, and publish the shares that we need
514 # to encrypt, encode, and publish.
516 # This will eventually hold the block hash chain for each share
517 # that we publish. We define it this way so that empty publishes
518 # will still have something to write to the remote slot.
519 self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
520 for i in xrange(self.total_shares):
521 blocks = self.blockhashes[i]
522 for j in xrange(self.num_segments):
524 self.sharehash_leaves = None # eventually [sharehashes]
525 self.sharehashes = {} # shnum -> [sharehash leaves necessary to
526 # validate the share]
528 self.log("Starting push")
530 self._state = PUSHING_BLOCKS_STATE
533 return self.done_deferred
536 def _update_status(self):
537 self._status.set_status("Sending Shares: %d placed out of %d, "
538 "%d messages outstanding" %
541 self.num_outstanding))
542 self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
545 def setup_encoding_parameters(self, offset=0):
546 if self._version == MDMF_VERSION:
547 segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
549 segment_size = self.datalength # SDMF is only one segment
550 # this must be a multiple of self.required_shares
551 segment_size = mathutil.next_multiple(segment_size,
552 self.required_shares)
553 self.segment_size = segment_size
555 # Calculate the starting segment for the upload.
557 # We use div_ceil instead of integer division here because
558 # it is semantically correct.
559 # If datalength isn't an even multiple of segment_size, but
560 # is larger than segment_size, datalength // segment_size
561 # will be the largest number such that num <= datalength and
562 # num % segment_size == 0. But that's not what we want,
563 # because it ignores the extra data. div_ceil will give us
564 # the right number of segments for the data that we're
566 self.num_segments = mathutil.div_ceil(self.datalength,
569 self.starting_segment = offset // segment_size
572 self.num_segments = 0
573 self.starting_segment = 0
576 self.log("building encoding parameters for file")
577 self.log("got segsize %d" % self.segment_size)
578 self.log("got %d segments" % self.num_segments)
580 if self._version == SDMF_VERSION:
581 assert self.num_segments in (0, 1) # SDMF
582 # calculate the tail segment size.
584 if segment_size and self.datalength:
585 self.tail_segment_size = self.datalength % segment_size
586 self.log("got tail segment size %d" % self.tail_segment_size)
588 self.tail_segment_size = 0
590 if self.tail_segment_size == 0 and segment_size:
591 # The tail segment is the same size as the other segments.
592 self.tail_segment_size = segment_size
595 fec = codec.CRSEncoder()
596 fec.set_params(self.segment_size,
597 self.required_shares, self.total_shares)
598 self.piece_size = fec.get_block_size()
601 if self.tail_segment_size == self.segment_size:
602 self.tail_fec = self.fec
604 tail_fec = codec.CRSEncoder()
605 tail_fec.set_params(self.tail_segment_size,
606 self.required_shares,
608 self.tail_fec = tail_fec
610 self._current_segment = self.starting_segment
611 self.end_segment = self.num_segments - 1
612 # Now figure out where the last segment should be.
613 if self.data.get_size() != self.datalength:
614 # We're updating a few segments in the middle of a mutable
615 # file, so we don't want to republish the whole thing.
616 # (we don't have enough data to do that even if we wanted
618 end = self.data.get_size()
619 self.end_segment = end // segment_size
620 if end % segment_size == 0:
621 self.end_segment -= 1
623 self.log("got start segment %d" % self.starting_segment)
624 self.log("got end segment %d" % self.end_segment)
627 def _push(self, ignored=None):
629 I manage state transitions. In particular, I see that we still
630 have a good enough number of writers to complete the upload
633 # Can we still successfully publish this file?
634 # TODO: Keep track of outstanding queries before aborting the
636 if len(self.writers) < self.required_shares or self.surprised:
637 return self._failure()
639 # Figure out what we need to do next. Each of these needs to
640 # return a deferred so that we don't block execution when this
641 # is first called in the upload method.
642 if self._state == PUSHING_BLOCKS_STATE:
643 return self.push_segment(self._current_segment)
645 elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
646 return self.push_everything_else()
648 # If we make it to this point, we were successful in placing the
653 def push_segment(self, segnum):
654 if self.num_segments == 0 and self._version == SDMF_VERSION:
655 self._add_dummy_salts()
657 if segnum > self.end_segment:
658 # We don't have any more segments to push.
659 self._state = PUSHING_EVERYTHING_ELSE_STATE
662 d = self._encode_segment(segnum)
663 d.addCallback(self._push_segment, segnum)
664 def _increment_segnum(ign):
665 self._current_segment += 1
666 # XXX: I don't think we need to do addBoth here -- any errBacks
667 # should be handled within push_segment.
668 d.addCallback(_increment_segnum)
669 d.addCallback(self._turn_barrier)
670 d.addCallback(self._push)
671 d.addErrback(self._failure)
674 def _turn_barrier(self, result):
676 I help the publish process avoid the recursion limit issues
679 return fireEventually(result)
682 def _add_dummy_salts(self):
684 SDMF files need a salt even if they're empty, or the signature
685 won't make sense. This method adds a dummy salt to each of our
686 SDMF writers so that they can write the signature later.
688 salt = os.urandom(16)
689 assert self._version == SDMF_VERSION
691 for writer in self.writers.itervalues():
692 writer.put_salt(salt)
695 def _encode_segment(self, segnum):
697 I encrypt and encode the segment segnum.
699 started = time.time()
701 if segnum + 1 == self.num_segments:
702 segsize = self.tail_segment_size
704 segsize = self.segment_size
707 self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
708 data = self.data.read(segsize)
709 # XXX: This is dumb. Why return a list?
712 assert len(data) == segsize, len(data)
714 salt = os.urandom(16)
716 key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
717 self._status.set_status("Encrypting")
719 crypttext = enc.process(data)
720 assert len(crypttext) == len(data)
723 self._status.accumulate_encrypt_time(now - started)
727 if segnum + 1 == self.num_segments:
732 self._status.set_status("Encoding")
733 crypttext_pieces = [None] * self.required_shares
734 piece_size = fec.get_block_size()
735 for i in range(len(crypttext_pieces)):
736 offset = i * piece_size
737 piece = crypttext[offset:offset+piece_size]
738 piece = piece + "\x00"*(piece_size - len(piece)) # padding
739 crypttext_pieces[i] = piece
740 assert len(piece) == piece_size
741 d = fec.encode(crypttext_pieces)
742 def _done_encoding(res):
743 elapsed = time.time() - started
744 self._status.accumulate_encode_time(elapsed)
746 d.addCallback(_done_encoding)
750 def _push_segment(self, encoded_and_salt, segnum):
752 I push (data, salt) as segment number segnum.
754 results, salt = encoded_and_salt
755 shares, shareids = results
756 self._status.set_status("Pushing segment")
757 for i in xrange(len(shares)):
758 sharedata = shares[i]
759 shareid = shareids[i]
760 if self._version == MDMF_VERSION:
761 hashed = salt + sharedata
764 block_hash = hashutil.block_hash(hashed)
765 self.blockhashes[shareid][segnum] = block_hash
766 # find the writer for this share
767 writer = self.writers[shareid]
768 writer.put_block(sharedata, segnum, salt)
771 def push_everything_else(self):
773 I put everything else associated with a share.
775 self._pack_started = time.time()
776 self.push_encprivkey()
777 self.push_blockhashes()
778 self.push_sharehashes()
779 self.push_toplevel_hashes_and_signature()
780 d = self.finish_publishing()
781 def _change_state(ignored):
782 self._state = DONE_STATE
783 d.addCallback(_change_state)
784 d.addCallback(self._push)
788 def push_encprivkey(self):
789 encprivkey = self._encprivkey
790 self._status.set_status("Pushing encrypted private key")
791 for writer in self.writers.itervalues():
792 writer.put_encprivkey(encprivkey)
795 def push_blockhashes(self):
796 self.sharehash_leaves = [None] * len(self.blockhashes)
797 self._status.set_status("Building and pushing block hash tree")
798 for shnum, blockhashes in self.blockhashes.iteritems():
799 t = hashtree.HashTree(blockhashes)
800 self.blockhashes[shnum] = list(t)
801 # set the leaf for future use.
802 self.sharehash_leaves[shnum] = t[0]
804 writer = self.writers[shnum]
805 writer.put_blockhashes(self.blockhashes[shnum])
808 def push_sharehashes(self):
809 self._status.set_status("Building and pushing share hash chain")
810 share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
811 for shnum in xrange(len(self.sharehash_leaves)):
812 needed_indices = share_hash_tree.needed_hashes(shnum)
813 self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
814 for i in needed_indices] )
815 writer = self.writers[shnum]
816 writer.put_sharehashes(self.sharehashes[shnum])
817 self.root_hash = share_hash_tree[0]
820 def push_toplevel_hashes_and_signature(self):
821 # We need to to three things here:
822 # - Push the root hash and salt hash
823 # - Get the checkstring of the resulting layout; sign that.
824 # - Push the signature
825 self._status.set_status("Pushing root hashes and signature")
826 for shnum in xrange(self.total_shares):
827 writer = self.writers[shnum]
828 writer.put_root_hash(self.root_hash)
829 self._update_checkstring()
830 self._make_and_place_signature()
833 def _update_checkstring(self):
835 After putting the root hash, MDMF files will have the
836 checkstring written to the storage server. This means that we
837 can update our copy of the checkstring so we can detect
838 uncoordinated writes. SDMF files will have the same checkstring,
839 so we need not do anything.
841 self._checkstring = self.writers.values()[0].get_checkstring()
844 def _make_and_place_signature(self):
846 I create and place the signature.
848 started = time.time()
849 self._status.set_status("Signing prefix")
850 signable = self.writers[0].get_signable()
851 self.signature = self._privkey.sign(signable)
853 for (shnum, writer) in self.writers.iteritems():
854 writer.put_signature(self.signature)
855 self._status.timings['sign'] = time.time() - started
858 def finish_publishing(self):
859 # We're almost done -- we just need to put the verification key
861 started = time.time()
862 self._status.set_status("Pushing shares")
863 self._started_pushing = started
865 verification_key = self._pubkey.serialize()
867 for (shnum, writer) in self.writers.copy().iteritems():
868 writer.put_verification_key(verification_key)
869 self.num_outstanding += 1
870 def _no_longer_outstanding(res):
871 self.num_outstanding -= 1
874 d = writer.finish_publishing()
875 d.addBoth(_no_longer_outstanding)
876 d.addErrback(self._connection_problem, writer)
877 d.addCallback(self._got_write_answer, writer, started)
879 self._record_verinfo()
880 self._status.timings['pack'] = time.time() - started
881 return defer.DeferredList(ds)
884 def _record_verinfo(self):
885 self.versioninfo = self.writers.values()[0].get_verinfo()
888 def _connection_problem(self, f, writer):
890 We ran into a connection problem while working with writer, and
891 need to deal with that.
893 self.log("found problem: %s" % str(f))
894 self._last_failure = f
895 del(self.writers[writer.shnum])
898 def log_goal(self, goal, message=""):
900 for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
901 logmsg.append("sh%d to [%s]" % (shnum,
902 idlib.shortnodeid_b2a(peerid)))
903 self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
904 self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
907 def update_goal(self):
908 # if log.recording_noisy
910 self.log_goal(self.goal, "before update: ")
912 # first, remove any bad peers from our goal
913 self.goal = set([ (peerid, shnum)
914 for (peerid, shnum) in self.goal
915 if peerid not in self.bad_peers ])
917 # find the homeless shares:
918 homefull_shares = set([shnum for (peerid, shnum) in self.goal])
919 homeless_shares = set(range(self.total_shares)) - homefull_shares
920 homeless_shares = sorted(list(homeless_shares))
921 # place them somewhere. We prefer unused servers at the beginning of
922 # the available peer list.
924 if not homeless_shares:
927 # if an old share X is on a node, put the new share X there too.
928 # TODO: 1: redistribute shares to achieve one-per-peer, by copying
929 # shares from existing peers to new (less-crowded) ones. The
930 # old shares must still be updated.
931 # TODO: 2: move those shares instead of copying them, to reduce future
934 # this is a bit CPU intensive but easy to analyze. We create a sort
935 # order for each peerid. If the peerid is marked as bad, we don't
936 # even put them in the list. Then we care about the number of shares
937 # which have already been assigned to them. After that we care about
938 # their permutation order.
939 old_assignments = DictOfSets()
940 for (peerid, shnum) in self.goal:
941 old_assignments.add(peerid, shnum)
944 for i, (peerid, ss) in enumerate(self.full_peerlist):
945 if peerid in self.bad_peers:
947 entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
948 peerlist.append(entry)
952 raise NotEnoughServersError("Ran out of non-bad servers, "
954 str(self._first_write_error),
955 self._first_write_error)
957 # we then index this peerlist with an integer, because we may have to
958 # wrap. We update the goal as we go.
960 for shnum in homeless_shares:
961 (ignored1, ignored2, peerid, ss) = peerlist[i]
962 # if we are forced to send a share to a server that already has
963 # one, we may have two write requests in flight, and the
964 # servermap (which was computed before either request was sent)
965 # won't reflect the new shares, so the second response will be
966 # surprising. There is code in _got_write_answer() to tolerate
967 # this, otherwise it would cause the publish to fail with an
968 # UncoordinatedWriteError. See #546 for details of the trouble
969 # this used to cause.
970 self.goal.add( (peerid, shnum) )
971 self.connections[peerid] = ss
973 if i >= len(peerlist):
976 self.log_goal(self.goal, "after update: ")
979 def _got_write_answer(self, answer, writer, started):
981 # SDMF writers only pretend to write when readers set their
982 # blocks, salts, and so on -- they actually just write once,
983 # at the end of the upload process. In fake writes, they
984 # return defer.succeed(None). If we see that, we shouldn't
985 # bother checking it.
988 peerid = writer.peerid
989 lp = self.log("_got_write_answer from %s, share %d" %
990 (idlib.shortnodeid_b2a(peerid), writer.shnum))
993 elapsed = now - started
995 self._status.add_per_server_time(peerid, elapsed)
997 wrote, read_data = answer
999 surprise_shares = set(read_data.keys()) - set([writer.shnum])
1001 # We need to remove from surprise_shares any shares that we are
1002 # knowingly also writing to that peer from other writers.
1004 # TODO: Precompute this.
1005 known_shnums = [x.shnum for x in self.writers.values()
1006 if x.peerid == peerid]
1007 surprise_shares -= set(known_shnums)
1008 self.log("found the following surprise shares: %s" %
1009 str(surprise_shares))
1011 # Now surprise shares contains all of the shares that we did not
1012 # expect to be there.
1015 for shnum in surprise_shares:
1016 # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
1017 checkstring = read_data[shnum][0]
1018 # What we want to do here is to see if their (seqnum,
1019 # roothash, salt) is the same as our (seqnum, roothash,
1020 # salt), or the equivalent for MDMF. The best way to do this
1021 # is to store a packed representation of our checkstring
1022 # somewhere, then not bother unpacking the other
1024 if checkstring == self._checkstring:
1025 # they have the right share, somehow
1027 if (peerid,shnum) in self.goal:
1028 # and we want them to have it, so we probably sent them a
1029 # copy in an earlier write. This is ok, and avoids the
1033 # They aren't in our goal, but they are still for the right
1034 # version. Somebody else wrote them, and it's a convergent
1035 # uncoordinated write. Pretend this is ok (don't be
1036 # surprised), since I suspect there's a decent chance that
1037 # we'll hit this in normal operation.
1041 # the new shares are of a different version
1042 if peerid in self._servermap.reachable_peers:
1043 # we asked them about their shares, so we had knowledge
1044 # of what they used to have. Any surprising shares must
1045 # have come from someone else, so UCW.
1048 # we didn't ask them, and now we've discovered that they
1049 # have a share we didn't know about. This indicates that
1050 # mapupdate should have wokred harder and asked more
1051 # servers before concluding that it knew about them all.
1053 # signal UCW, but make sure to ask this peer next time,
1054 # so we'll remember to update it if/when we retry.
1056 # TODO: ask this peer next time. I don't yet have a good
1057 # way to do this. Two insufficient possibilities are:
1059 # self._servermap.add_new_share(peerid, shnum, verinfo, now)
1060 # but that requires fetching/validating/parsing the whole
1061 # version string, and all we have is the checkstring
1062 # self._servermap.mark_bad_share(peerid, shnum, checkstring)
1063 # that will make publish overwrite the share next time,
1064 # but it won't re-query the server, and it won't make
1065 # mapupdate search further
1067 # TODO later: when publish starts, do
1068 # servermap.get_best_version(), extract the seqnum,
1069 # subtract one, and store as highest-replaceable-seqnum.
1070 # Then, if this surprise-because-we-didn't-ask share is
1071 # of highest-replaceable-seqnum or lower, we're allowed
1072 # to replace it: send out a new writev (or rather add it
1073 # to self.goal and loop).
1079 self.log("they had shares %s that we didn't know about" %
1080 (list(surprise_shares),),
1081 parent=lp, level=log.WEIRD, umid="un9CSQ")
1082 self.surprised = True
1085 # TODO: there are two possibilities. The first is that the server
1086 # is full (or just doesn't want to give us any room), which means
1087 # we shouldn't ask them again, but is *not* an indication of an
1088 # uncoordinated write. The second is that our testv failed, which
1089 # *does* indicate an uncoordinated write. We currently don't have
1090 # a way to tell these two apart (in fact, the storage server code
1091 # doesn't have the option of refusing our share).
1093 # If the server is full, mark the peer as bad (so we don't ask
1094 # them again), but don't set self.surprised. The loop() will find
1097 # If the testv failed, log it, set self.surprised, but don't
1098 # bother adding to self.bad_peers .
1100 self.log("our testv failed, so the write did not happen",
1101 parent=lp, level=log.WEIRD, umid="8sc26g")
1102 self.surprised = True
1103 self.bad_peers.add(writer) # don't ask them again
1104 # use the checkstring to add information to the log message
1105 unknown_format = False
1106 for (shnum,readv) in read_data.items():
1107 checkstring = readv[0]
1108 version = get_version_from_checkstring(checkstring)
1109 if version == MDMF_VERSION:
1111 other_roothash) = unpack_mdmf_checkstring(checkstring)
1112 elif version == SDMF_VERSION:
1115 other_IV) = unpack_sdmf_checkstring(checkstring)
1117 unknown_format = True
1118 expected_version = self._servermap.version_on_peer(peerid,
1120 if expected_version:
1121 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1122 offsets_tuple) = expected_version
1123 msg = ("somebody modified the share on us:"
1124 " shnum=%d: I thought they had #%d:R=%s," %
1126 seqnum, base32.b2a(root_hash)[:4]))
1128 msg += (" but I don't know how to read share"
1129 " format %d" % version)
1131 msg += " but testv reported #%d:R=%s" % \
1132 (other_seqnum, other_roothash)
1133 self.log(msg, parent=lp, level=log.NOISY)
1134 # if expected_version==None, then we didn't expect to see a
1135 # share on that peer, and the 'surprise_shares' clause above
1136 # will have logged it.
1139 # and update the servermap
1140 # self.versioninfo is set during the last phase of publishing.
1141 # If we get there, we know that responses correspond to placed
1142 # shares, and can safely execute these statements.
1143 if self.versioninfo:
1144 self.log("wrote successfully: adding new share to servermap")
1145 self._servermap.add_new_share(peerid, writer.shnum,
1146 self.versioninfo, started)
1147 self.placed.add( (peerid, writer.shnum) )
1148 self._update_status()
1149 # the next method in the deferred chain will check to see if
1150 # we're done and successful.
1155 if not self._running:
1157 self._running = False
1159 self._status.timings["total"] = now - self._started
1161 elapsed = now - self._started_pushing
1162 self._status.timings['push'] = elapsed
1164 self._status.set_active(False)
1165 self.log("Publish done, success")
1166 self._status.set_status("Finished")
1167 self._status.set_progress(1.0)
1168 # Get k and segsize, then give them to the caller.
1170 hints['segsize'] = self.segment_size
1171 hints['k'] = self.required_shares
1172 self._node.set_downloader_hints(hints)
1173 eventually(self.done_deferred.callback, None)
1175 def _failure(self, f=None):
1177 self._last_failure = f
1179 if not self.surprised:
1180 # We ran out of servers
1181 msg = "Publish ran out of good servers"
1182 if self._last_failure:
1183 msg += ", last failure was: %s" % str(self._last_failure)
1185 e = NotEnoughServersError(msg)
1188 # We ran into shares that we didn't recognize, which means
1189 # that we need to return an UncoordinatedWriteError.
1190 self.log("Publish failed with UncoordinatedWriteError")
1191 e = UncoordinatedWriteError()
1192 f = failure.Failure(e)
1193 eventually(self.done_deferred.callback, f)
1196 class MutableFileHandle:
1198 I am a mutable uploadable built around a filehandle-like object,
1199 usually either a StringIO instance or a handle to an actual file.
1201 implements(IMutableUploadable)
1203 def __init__(self, filehandle):
1204 # The filehandle is defined as a generally file-like object that
1205 # has these two methods. We don't care beyond that.
1206 assert hasattr(filehandle, "read")
1207 assert hasattr(filehandle, "close")
1209 self._filehandle = filehandle
1210 # We must start reading at the beginning of the file, or we risk
1211 # encountering errors when the data read does not match the size
1212 # reported to the uploader.
1213 self._filehandle.seek(0)
1215 # We have not yet read anything, so our position is 0.
1221 I return the amount of data in my filehandle.
1223 if not hasattr(self, "_size"):
1224 old_position = self._filehandle.tell()
1225 # Seek to the end of the file by seeking 0 bytes from the
1227 self._filehandle.seek(0, 2) # 2 == os.SEEK_END in 2.5+
1228 self._size = self._filehandle.tell()
1229 # Restore the previous position, in case this was called
1231 self._filehandle.seek(old_position)
1232 assert self._filehandle.tell() == old_position
1234 assert hasattr(self, "_size")
1240 I return the position of my read marker -- i.e., how much data I
1241 have already read and returned to callers.
1246 def read(self, length):
1248 I return some data (up to length bytes) from my filehandle.
1250 In most cases, I return length bytes, but sometimes I won't --
1251 for example, if I am asked to read beyond the end of a file, or
1254 results = self._filehandle.read(length)
1255 self._marker += len(results)
1261 I close the underlying filehandle. Any further operations on the
1262 filehandle fail at this point.
1264 self._filehandle.close()
1267 class MutableData(MutableFileHandle):
1269 I am a mutable uploadable built around a string, which I then cast
1270 into a StringIO and treat as a filehandle.
1273 def __init__(self, s):
1274 # Take a string and return a file-like uploadable.
1275 assert isinstance(s, str)
1277 MutableFileHandle.__init__(self, StringIO(s))
1280 class TransformingUploadable:
1282 I am an IMutableUploadable that wraps another IMutableUploadable,
1283 and some segments that are already on the grid. When I am called to
1284 read, I handle merging of boundary segments.
1286 implements(IMutableUploadable)
1289 def __init__(self, data, offset, segment_size, start, end):
1290 assert IMutableUploadable.providedBy(data)
1292 self._newdata = data
1293 self._offset = offset
1294 self._segment_size = segment_size
1298 self._read_marker = 0
1300 self._first_segment_offset = offset % segment_size
1302 num = self.log("TransformingUploadable: starting", parent=None)
1303 self._log_number = num
1304 self.log("got fso: %d" % self._first_segment_offset)
1305 self.log("got offset: %d" % self._offset)
1308 def log(self, *args, **kwargs):
1309 if 'parent' not in kwargs:
1310 kwargs['parent'] = self._log_number
1311 if "facility" not in kwargs:
1312 kwargs["facility"] = "tahoe.mutable.transforminguploadable"
1313 return log.msg(*args, **kwargs)
1317 return self._offset + self._newdata.get_size()
1320 def read(self, length):
1321 # We can get data from 3 sources here.
1322 # 1. The first of the segments provided to us.
1323 # 2. The data that we're replacing things with.
1324 # 3. The last of the segments provided to us.
1326 # are we in state 0?
1327 self.log("reading %d bytes" % length)
1330 old_data_length = self._first_segment_offset - self._read_marker
1331 if old_data_length > 0:
1332 if old_data_length > length:
1333 old_data_length = length
1334 self.log("returning %d bytes of old start data" % old_data_length)
1336 old_data_end = old_data_length + self._read_marker
1337 old_start_data = self._start[self._read_marker:old_data_end]
1338 length -= old_data_length
1340 # otherwise calculations later get screwed up.
1343 # Is there enough new data to satisfy this read? If not, we need
1344 # to pad the end of the data with data from our last segment.
1345 old_end_length = length - \
1346 (self._newdata.get_size() - self._newdata.pos())
1348 if old_end_length > 0:
1349 self.log("reading %d bytes of old end data" % old_end_length)
1351 # TODO: We're not explicitly checking for tail segment size
1352 # here. Is that a problem?
1353 old_data_offset = (length - old_end_length + \
1354 old_data_length) % self._segment_size
1355 self.log("reading at offset %d" % old_data_offset)
1356 old_end = old_data_offset + old_end_length
1357 old_end_data = self._end[old_data_offset:old_end]
1358 length -= old_end_length
1359 assert length == self._newdata.get_size() - self._newdata.pos()
1361 self.log("reading %d bytes of new data" % length)
1362 new_data = self._newdata.read(length)
1363 new_data = "".join(new_data)
1365 self._read_marker += len(old_start_data + new_data + old_end_data)
1367 return old_start_data + new_data + old_end_data