4 from StringIO import StringIO
5 from itertools import count
6 from zope.interface import implements
7 from twisted.internet import defer
8 from twisted.python import failure
9 from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
11 from allmydata.util import base32, hashutil, mathutil, log
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from foolscap.api import eventually, fireEventually
18 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
19 UncoordinatedWriteError, NotEnoughServersError
20 from allmydata.mutable.servermap import ServerMap
21 from allmydata.mutable.layout import get_version_from_checkstring,\
22 unpack_mdmf_checkstring, \
23 unpack_sdmf_checkstring, \
28 DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
29 PUSHING_BLOCKS_STATE = 0
30 PUSHING_EVERYTHING_ELSE_STATE = 1
34 implements(IPublishStatus)
35 statusid_counter = count(0)
38 self.timings["send_per_server"] = {}
39 self.timings["encrypt"] = 0.0
40 self.timings["encode"] = 0.0
44 self.storage_index = None
46 self.encoding = ("?", "?")
48 self.status = "Not started"
50 self.counter = self.statusid_counter.next()
51 self.started = time.time()
53 def add_per_server_time(self, server, elapsed):
54 serverid = server.get_serverid()
55 if serverid not in self.timings["send_per_server"]:
56 self.timings["send_per_server"][serverid] = []
57 self.timings["send_per_server"][serverid].append(elapsed)
58 def accumulate_encode_time(self, elapsed):
59 self.timings["encode"] += elapsed
60 def accumulate_encrypt_time(self, elapsed):
61 self.timings["encrypt"] += elapsed
63 def get_started(self):
65 def get_storage_index(self):
66 return self.storage_index
67 def get_encoding(self):
69 def using_helper(self):
71 def get_servermap(self):
77 def get_progress(self):
81 def get_counter(self):
83 def get_problems(self):
86 def set_storage_index(self, si):
87 self.storage_index = si
88 def set_helper(self, helper):
90 def set_servermap(self, servermap):
91 self.servermap = servermap
92 def set_encoding(self, k, n):
93 self.encoding = (k, n)
94 def set_size(self, size):
96 def set_status(self, status):
98 def set_progress(self, value):
100 def set_active(self, value):
103 class LoopLimitExceededError(Exception):
107 """I represent a single act of publishing the mutable file to the grid. I
108 will only publish my data if the servermap I am using still represents
109 the current state of the world.
111 To make the initial publish, set servermap to None.
114 def __init__(self, filenode, storage_broker, servermap):
115 self._node = filenode
116 self._storage_broker = storage_broker
117 self._servermap = servermap
118 self._storage_index = self._node.get_storage_index()
119 self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
120 num = self.log("Publish(%s): starting" % prefix, parent=None)
121 self._log_number = num
123 self._first_write_error = None
124 self._last_failure = None
126 self._status = PublishStatus()
127 self._status.set_storage_index(self._storage_index)
128 self._status.set_helper(False)
129 self._status.set_progress(0.0)
130 self._status.set_active(True)
131 self._version = self._node.get_version()
132 assert self._version in (SDMF_VERSION, MDMF_VERSION)
135 def get_status(self):
138 def log(self, *args, **kwargs):
139 if 'parent' not in kwargs:
140 kwargs['parent'] = self._log_number
141 if "facility" not in kwargs:
142 kwargs["facility"] = "tahoe.mutable.publish"
143 return log.msg(*args, **kwargs)
146 def update(self, data, offset, blockhashes, version):
148 I replace the contents of this file with the contents of data,
149 starting at offset. I return a Deferred that fires with None
150 when the replacement has been completed, or with an error if
151 something went wrong during the process.
153 Note that this process will not upload new shares. If the file
154 being updated is in need of repair, callers will have to repair
158 # 1: Make server assignments. We'll assign each share that we know
159 # about on the grid to that server that currently holds that
160 # share, and will not place any new shares.
161 # 2: Setup encoding parameters. Most of these will stay the same
162 # -- datalength will change, as will some of the offsets.
163 # 3. Upload the new segments.
165 assert IMutableUploadable.providedBy(data)
169 # XXX: Use the MutableFileVersion instead.
170 self.datalength = self._node.get_size()
171 if data.get_size() > self.datalength:
172 self.datalength = data.get_size()
174 self.log("starting update")
175 self.log("adding new data of length %d at offset %d" % \
176 (data.get_size(), offset))
177 self.log("new data length is %d" % self.datalength)
178 self._status.set_size(self.datalength)
179 self._status.set_status("Started")
180 self._started = time.time()
182 self.done_deferred = defer.Deferred()
184 self._writekey = self._node.get_writekey()
185 assert self._writekey, "need write capability to publish"
187 # first, which servers will we publish to? We require that the
188 # servermap was updated in MODE_WRITE, so we can depend upon the
189 # serverlist computed by that process instead of computing our own.
190 assert self._servermap
191 assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK)
192 # we will push a version that is one larger than anything present
193 # in the grid, according to the servermap.
194 self._new_seqnum = self._servermap.highest_seqnum() + 1
195 self._status.set_servermap(self._servermap)
197 self.log(format="new seqnum will be %(seqnum)d",
198 seqnum=self._new_seqnum, level=log.NOISY)
200 # We're updating an existing file, so all of the following
201 # should be available.
202 self.readkey = self._node.get_readkey()
203 self.required_shares = self._node.get_required_shares()
204 assert self.required_shares is not None
205 self.total_shares = self._node.get_total_shares()
206 assert self.total_shares is not None
207 self._status.set_encoding(self.required_shares, self.total_shares)
209 self._pubkey = self._node.get_pubkey()
211 self._privkey = self._node.get_privkey()
213 self._encprivkey = self._node.get_encprivkey()
215 sb = self._storage_broker
216 full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
217 self.full_serverlist = full_serverlist # for use later, immutable
218 self.bad_servers = set() # servers who have errbacked/refused requests
220 # This will set self.segment_size, self.num_segments, and
221 # self.fec. TODO: Does it know how to do the offset? Probably
222 # not. So do that part next.
223 self.setup_encoding_parameters(offset=offset)
225 # if we experience any surprises (writes which were rejected because
226 # our test vector did not match, or shares which we didn't expect to
227 # see), we set this flag and report an UncoordinatedWriteError at the
228 # end of the publish process.
229 self.surprised = False
231 # we keep track of three tables. The first is our goal: which share
232 # we want to see on which servers. This is initially populated by the
233 # existing servermap.
234 self.goal = set() # pairs of (server, shnum) tuples
236 # the number of outstanding queries: those that are in flight and
237 # may or may not be delivered, accepted, or acknowledged. This is
238 # incremented when a query is sent, and decremented when the response
239 # returns or errbacks.
240 self.num_outstanding = 0
242 # the third is a table of successes: share which have actually been
243 # placed. These are populated when responses come back with success.
244 # When self.placed == self.goal, we're done.
245 self.placed = set() # (server, shnum) tuples
247 self.bad_share_checkstrings = {}
249 # This is set at the last step of the publishing process.
250 self.versioninfo = ""
252 # we use the servermap to populate the initial goal: this way we will
253 # try to update each existing share in place. Since we're
254 # updating, we ignore damaged and missing shares -- callers must
255 # do a repair to repair and recreate these.
256 self.goal = set(self._servermap.get_known_shares())
259 # SDMF files are updated differently.
260 self._version = MDMF_VERSION
261 writer_class = MDMFSlotWriteProxy
263 # For each (server, shnum) in self.goal, we make a
264 # write proxy for that server. We'll use this to write
265 # shares to the server.
266 for (server,shnum) in self.goal:
267 write_enabler = self._node.get_write_enabler(server)
268 renew_secret = self._node.get_renewal_secret(server)
269 cancel_secret = self._node.get_cancel_secret(server)
270 secrets = (write_enabler, renew_secret, cancel_secret)
272 self.writers[shnum] = writer_class(shnum,
277 self.required_shares,
281 self.writers[shnum].server = server
282 known_shares = self._servermap.get_known_shares()
283 assert (server, shnum) in known_shares
284 old_versionid, old_timestamp = known_shares[(server,shnum)]
285 (old_seqnum, old_root_hash, old_salt, old_segsize,
286 old_datalength, old_k, old_N, old_prefix,
287 old_offsets_tuple) = old_versionid
288 self.writers[shnum].set_checkstring(old_seqnum,
292 # Our remote shares will not have a complete checkstring until
293 # after we are done writing share data and have started to write
294 # blocks. In the meantime, we need to know what to look for when
295 # writing, so that we can detect UncoordinatedWriteErrors.
296 self._checkstring = self.writers.values()[0].get_checkstring()
298 # Now, we start pushing shares.
299 self._status.timings["setup"] = time.time() - self._started
300 # First, we encrypt, encode, and publish the shares that we need
301 # to encrypt, encode, and publish.
303 # Our update process fetched these for us. We need to update
304 # them in place as publishing happens.
305 self.blockhashes = {} # (shnum, [blochashes])
306 for (i, bht) in blockhashes.iteritems():
307 # We need to extract the leaves from our old hash tree.
308 old_segcount = mathutil.div_ceil(version[4],
310 h = hashtree.IncompleteHashTree(old_segcount)
311 bht = dict(enumerate(bht))
313 leaves = h[h.get_leaf_index(0):]
314 for j in xrange(self.num_segments - len(leaves)):
317 assert len(leaves) >= self.num_segments
318 self.blockhashes[i] = leaves
319 # This list will now be the leaves that were set during the
320 # initial upload + enough empty hashes to make it a
321 # power-of-two. If we exceed a power of two boundary, we
322 # should be encoding the file over again, and should not be
324 #assert len(self.blockhashes[i]) == \
325 # hashtree.roundup_pow2(self.num_segments), \
326 # len(self.blockhashes[i])
327 # XXX: Except this doesn't work. Figure out why.
329 # These are filled in later, after we've modified the block hash
331 self.sharehash_leaves = None # eventually [sharehashes]
332 self.sharehashes = {} # shnum -> [sharehash leaves necessary to
333 # validate the share]
335 self.log("Starting push")
337 self._state = PUSHING_BLOCKS_STATE
340 return self.done_deferred
343 def publish(self, newdata):
344 """Publish the filenode's current contents. Returns a Deferred that
345 fires (with None) when the publish has done as much work as it's ever
346 going to do, or errbacks with ConsistencyError if it detects a
350 # 0. Setup encoding parameters, encoder, and other such things.
351 # 1. Encrypt, encode, and publish segments.
352 assert IMutableUploadable.providedBy(newdata)
355 self.datalength = newdata.get_size()
356 #if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
357 # self._version = MDMF_VERSION
359 # self._version = SDMF_VERSION
361 self.log("starting publish, datalen is %s" % self.datalength)
362 self._status.set_size(self.datalength)
363 self._status.set_status("Started")
364 self._started = time.time()
366 self.done_deferred = defer.Deferred()
368 self._writekey = self._node.get_writekey()
369 assert self._writekey, "need write capability to publish"
371 # first, which servers will we publish to? We require that the
372 # servermap was updated in MODE_WRITE, so we can depend upon the
373 # serverlist computed by that process instead of computing our own.
375 assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK)
376 # we will push a version that is one larger than anything present
377 # in the grid, according to the servermap.
378 self._new_seqnum = self._servermap.highest_seqnum() + 1
380 # If we don't have a servermap, that's because we're doing the
383 self._servermap = ServerMap()
384 self._status.set_servermap(self._servermap)
386 self.log(format="new seqnum will be %(seqnum)d",
387 seqnum=self._new_seqnum, level=log.NOISY)
389 # having an up-to-date servermap (or using a filenode that was just
390 # created for the first time) also guarantees that the following
391 # fields are available
392 self.readkey = self._node.get_readkey()
393 self.required_shares = self._node.get_required_shares()
394 assert self.required_shares is not None
395 self.total_shares = self._node.get_total_shares()
396 assert self.total_shares is not None
397 self._status.set_encoding(self.required_shares, self.total_shares)
399 self._pubkey = self._node.get_pubkey()
401 self._privkey = self._node.get_privkey()
403 self._encprivkey = self._node.get_encprivkey()
405 sb = self._storage_broker
406 full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
407 self.full_serverlist = full_serverlist # for use later, immutable
408 self.bad_servers = set() # servers who have errbacked/refused requests
410 # This will set self.segment_size, self.num_segments, and
412 self.setup_encoding_parameters()
414 # if we experience any surprises (writes which were rejected because
415 # our test vector did not match, or shares which we didn't expect to
416 # see), we set this flag and report an UncoordinatedWriteError at the
417 # end of the publish process.
418 self.surprised = False
420 # we keep track of three tables. The first is our goal: which share
421 # we want to see on which servers. This is initially populated by the
422 # existing servermap.
423 self.goal = set() # pairs of (server, shnum) tuples
425 # the number of outstanding queries: those that are in flight and
426 # may or may not be delivered, accepted, or acknowledged. This is
427 # incremented when a query is sent, and decremented when the response
428 # returns or errbacks.
429 self.num_outstanding = 0
431 # the third is a table of successes: share which have actually been
432 # placed. These are populated when responses come back with success.
433 # When self.placed == self.goal, we're done.
434 self.placed = set() # (server, shnum) tuples
436 self.bad_share_checkstrings = {}
438 # This is set at the last step of the publishing process.
439 self.versioninfo = ""
441 # we use the servermap to populate the initial goal: this way we will
442 # try to update each existing share in place.
443 self.goal = set(self._servermap.get_known_shares())
445 # then we add in all the shares that were bad (corrupted, bad
446 # signatures, etc). We want to replace these.
447 for key, old_checkstring in self._servermap.get_bad_shares().items():
448 (server, shnum) = key
449 self.goal.add( (server,shnum) )
450 self.bad_share_checkstrings[(server,shnum)] = old_checkstring
452 # TODO: Make this part do server selection.
455 if self._version == MDMF_VERSION:
456 writer_class = MDMFSlotWriteProxy
458 writer_class = SDMFSlotWriteProxy
460 # For each (server, shnum) in self.goal, we make a
461 # write proxy for that server. We'll use this to write
462 # shares to the server.
463 for (server,shnum) in self.goal:
464 write_enabler = self._node.get_write_enabler(server)
465 renew_secret = self._node.get_renewal_secret(server)
466 cancel_secret = self._node.get_cancel_secret(server)
467 secrets = (write_enabler, renew_secret, cancel_secret)
469 self.writers[shnum] = writer_class(shnum,
474 self.required_shares,
478 self.writers[shnum].server = server
479 known_shares = self._servermap.get_known_shares()
480 if (server, shnum) in known_shares:
481 old_versionid, old_timestamp = known_shares[(server,shnum)]
482 (old_seqnum, old_root_hash, old_salt, old_segsize,
483 old_datalength, old_k, old_N, old_prefix,
484 old_offsets_tuple) = old_versionid
485 self.writers[shnum].set_checkstring(old_seqnum,
488 elif (server, shnum) in self.bad_share_checkstrings:
489 old_checkstring = self.bad_share_checkstrings[(server, shnum)]
490 self.writers[shnum].set_checkstring(old_checkstring)
492 # Our remote shares will not have a complete checkstring until
493 # after we are done writing share data and have started to write
494 # blocks. In the meantime, we need to know what to look for when
495 # writing, so that we can detect UncoordinatedWriteErrors.
496 self._checkstring = self.writers.values()[0].get_checkstring()
498 # Now, we start pushing shares.
499 self._status.timings["setup"] = time.time() - self._started
500 # First, we encrypt, encode, and publish the shares that we need
501 # to encrypt, encode, and publish.
503 # This will eventually hold the block hash chain for each share
504 # that we publish. We define it this way so that empty publishes
505 # will still have something to write to the remote slot.
506 self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
507 for i in xrange(self.total_shares):
508 blocks = self.blockhashes[i]
509 for j in xrange(self.num_segments):
511 self.sharehash_leaves = None # eventually [sharehashes]
512 self.sharehashes = {} # shnum -> [sharehash leaves necessary to
513 # validate the share]
515 self.log("Starting push")
517 self._state = PUSHING_BLOCKS_STATE
520 return self.done_deferred
523 def _update_status(self):
524 self._status.set_status("Sending Shares: %d placed out of %d, "
525 "%d messages outstanding" %
528 self.num_outstanding))
529 self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
532 def setup_encoding_parameters(self, offset=0):
533 if self._version == MDMF_VERSION:
534 segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
536 segment_size = self.datalength # SDMF is only one segment
537 # this must be a multiple of self.required_shares
538 segment_size = mathutil.next_multiple(segment_size,
539 self.required_shares)
540 self.segment_size = segment_size
542 # Calculate the starting segment for the upload.
544 # We use div_ceil instead of integer division here because
545 # it is semantically correct.
546 # If datalength isn't an even multiple of segment_size, but
547 # is larger than segment_size, datalength // segment_size
548 # will be the largest number such that num <= datalength and
549 # num % segment_size == 0. But that's not what we want,
550 # because it ignores the extra data. div_ceil will give us
551 # the right number of segments for the data that we're
553 self.num_segments = mathutil.div_ceil(self.datalength,
556 self.starting_segment = offset // segment_size
559 self.num_segments = 0
560 self.starting_segment = 0
563 self.log("building encoding parameters for file")
564 self.log("got segsize %d" % self.segment_size)
565 self.log("got %d segments" % self.num_segments)
567 if self._version == SDMF_VERSION:
568 assert self.num_segments in (0, 1) # SDMF
569 # calculate the tail segment size.
571 if segment_size and self.datalength:
572 self.tail_segment_size = self.datalength % segment_size
573 self.log("got tail segment size %d" % self.tail_segment_size)
575 self.tail_segment_size = 0
577 if self.tail_segment_size == 0 and segment_size:
578 # The tail segment is the same size as the other segments.
579 self.tail_segment_size = segment_size
582 fec = codec.CRSEncoder()
583 fec.set_params(self.segment_size,
584 self.required_shares, self.total_shares)
585 self.piece_size = fec.get_block_size()
588 if self.tail_segment_size == self.segment_size:
589 self.tail_fec = self.fec
591 tail_fec = codec.CRSEncoder()
592 tail_fec.set_params(self.tail_segment_size,
593 self.required_shares,
595 self.tail_fec = tail_fec
597 self._current_segment = self.starting_segment
598 self.end_segment = self.num_segments - 1
599 # Now figure out where the last segment should be.
600 if self.data.get_size() != self.datalength:
601 # We're updating a few segments in the middle of a mutable
602 # file, so we don't want to republish the whole thing.
603 # (we don't have enough data to do that even if we wanted
605 end = self.data.get_size()
606 self.end_segment = end // segment_size
607 if end % segment_size == 0:
608 self.end_segment -= 1
610 self.log("got start segment %d" % self.starting_segment)
611 self.log("got end segment %d" % self.end_segment)
614 def _push(self, ignored=None):
616 I manage state transitions. In particular, I see that we still
617 have a good enough number of writers to complete the upload
620 # Can we still successfully publish this file?
621 # TODO: Keep track of outstanding queries before aborting the
623 if len(self.writers) < self.required_shares or self.surprised:
624 return self._failure()
626 # Figure out what we need to do next. Each of these needs to
627 # return a deferred so that we don't block execution when this
628 # is first called in the upload method.
629 if self._state == PUSHING_BLOCKS_STATE:
630 return self.push_segment(self._current_segment)
632 elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
633 return self.push_everything_else()
635 # If we make it to this point, we were successful in placing the
640 def push_segment(self, segnum):
641 if self.num_segments == 0 and self._version == SDMF_VERSION:
642 self._add_dummy_salts()
644 if segnum > self.end_segment:
645 # We don't have any more segments to push.
646 self._state = PUSHING_EVERYTHING_ELSE_STATE
649 d = self._encode_segment(segnum)
650 d.addCallback(self._push_segment, segnum)
651 def _increment_segnum(ign):
652 self._current_segment += 1
653 # XXX: I don't think we need to do addBoth here -- any errBacks
654 # should be handled within push_segment.
655 d.addCallback(_increment_segnum)
656 d.addCallback(self._turn_barrier)
657 d.addCallback(self._push)
658 d.addErrback(self._failure)
661 def _turn_barrier(self, result):
663 I help the publish process avoid the recursion limit issues
666 return fireEventually(result)
669 def _add_dummy_salts(self):
671 SDMF files need a salt even if they're empty, or the signature
672 won't make sense. This method adds a dummy salt to each of our
673 SDMF writers so that they can write the signature later.
675 salt = os.urandom(16)
676 assert self._version == SDMF_VERSION
678 for writer in self.writers.itervalues():
679 writer.put_salt(salt)
682 def _encode_segment(self, segnum):
684 I encrypt and encode the segment segnum.
686 started = time.time()
688 if segnum + 1 == self.num_segments:
689 segsize = self.tail_segment_size
691 segsize = self.segment_size
694 self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
695 data = self.data.read(segsize)
696 # XXX: This is dumb. Why return a list?
699 assert len(data) == segsize, len(data)
701 salt = os.urandom(16)
703 key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
704 self._status.set_status("Encrypting")
706 crypttext = enc.process(data)
707 assert len(crypttext) == len(data)
710 self._status.accumulate_encrypt_time(now - started)
714 if segnum + 1 == self.num_segments:
719 self._status.set_status("Encoding")
720 crypttext_pieces = [None] * self.required_shares
721 piece_size = fec.get_block_size()
722 for i in range(len(crypttext_pieces)):
723 offset = i * piece_size
724 piece = crypttext[offset:offset+piece_size]
725 piece = piece + "\x00"*(piece_size - len(piece)) # padding
726 crypttext_pieces[i] = piece
727 assert len(piece) == piece_size
728 d = fec.encode(crypttext_pieces)
729 def _done_encoding(res):
730 elapsed = time.time() - started
731 self._status.accumulate_encode_time(elapsed)
733 d.addCallback(_done_encoding)
737 def _push_segment(self, encoded_and_salt, segnum):
739 I push (data, salt) as segment number segnum.
741 results, salt = encoded_and_salt
742 shares, shareids = results
743 self._status.set_status("Pushing segment")
744 for i in xrange(len(shares)):
745 sharedata = shares[i]
746 shareid = shareids[i]
747 if self._version == MDMF_VERSION:
748 hashed = salt + sharedata
751 block_hash = hashutil.block_hash(hashed)
752 self.blockhashes[shareid][segnum] = block_hash
753 # find the writer for this share
754 writer = self.writers[shareid]
755 writer.put_block(sharedata, segnum, salt)
758 def push_everything_else(self):
760 I put everything else associated with a share.
762 self._pack_started = time.time()
763 self.push_encprivkey()
764 self.push_blockhashes()
765 self.push_sharehashes()
766 self.push_toplevel_hashes_and_signature()
767 d = self.finish_publishing()
768 def _change_state(ignored):
769 self._state = DONE_STATE
770 d.addCallback(_change_state)
771 d.addCallback(self._push)
775 def push_encprivkey(self):
776 encprivkey = self._encprivkey
777 self._status.set_status("Pushing encrypted private key")
778 for writer in self.writers.itervalues():
779 writer.put_encprivkey(encprivkey)
782 def push_blockhashes(self):
783 self.sharehash_leaves = [None] * len(self.blockhashes)
784 self._status.set_status("Building and pushing block hash tree")
785 for shnum, blockhashes in self.blockhashes.iteritems():
786 t = hashtree.HashTree(blockhashes)
787 self.blockhashes[shnum] = list(t)
788 # set the leaf for future use.
789 self.sharehash_leaves[shnum] = t[0]
791 writer = self.writers[shnum]
792 writer.put_blockhashes(self.blockhashes[shnum])
795 def push_sharehashes(self):
796 self._status.set_status("Building and pushing share hash chain")
797 share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
798 for shnum in xrange(len(self.sharehash_leaves)):
799 needed_indices = share_hash_tree.needed_hashes(shnum)
800 self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
801 for i in needed_indices] )
802 writer = self.writers[shnum]
803 writer.put_sharehashes(self.sharehashes[shnum])
804 self.root_hash = share_hash_tree[0]
807 def push_toplevel_hashes_and_signature(self):
808 # We need to to three things here:
809 # - Push the root hash and salt hash
810 # - Get the checkstring of the resulting layout; sign that.
811 # - Push the signature
812 self._status.set_status("Pushing root hashes and signature")
813 for shnum in xrange(self.total_shares):
814 writer = self.writers[shnum]
815 writer.put_root_hash(self.root_hash)
816 self._update_checkstring()
817 self._make_and_place_signature()
820 def _update_checkstring(self):
822 After putting the root hash, MDMF files will have the
823 checkstring written to the storage server. This means that we
824 can update our copy of the checkstring so we can detect
825 uncoordinated writes. SDMF files will have the same checkstring,
826 so we need not do anything.
828 self._checkstring = self.writers.values()[0].get_checkstring()
831 def _make_and_place_signature(self):
833 I create and place the signature.
835 started = time.time()
836 self._status.set_status("Signing prefix")
837 signable = self.writers[0].get_signable()
838 self.signature = self._privkey.sign(signable)
840 for (shnum, writer) in self.writers.iteritems():
841 writer.put_signature(self.signature)
842 self._status.timings['sign'] = time.time() - started
845 def finish_publishing(self):
846 # We're almost done -- we just need to put the verification key
848 started = time.time()
849 self._status.set_status("Pushing shares")
850 self._started_pushing = started
852 verification_key = self._pubkey.serialize()
854 for (shnum, writer) in self.writers.copy().iteritems():
855 writer.put_verification_key(verification_key)
856 self.num_outstanding += 1
857 def _no_longer_outstanding(res):
858 self.num_outstanding -= 1
861 d = writer.finish_publishing()
862 d.addBoth(_no_longer_outstanding)
863 d.addErrback(self._connection_problem, writer)
864 d.addCallback(self._got_write_answer, writer, started)
866 self._record_verinfo()
867 self._status.timings['pack'] = time.time() - started
868 return defer.DeferredList(ds)
871 def _record_verinfo(self):
872 self.versioninfo = self.writers.values()[0].get_verinfo()
875 def _connection_problem(self, f, writer):
877 We ran into a connection problem while working with writer, and
878 need to deal with that.
880 self.log("found problem: %s" % str(f))
881 self._last_failure = f
882 del(self.writers[writer.shnum])
885 def log_goal(self, goal, message=""):
887 for (shnum, server) in sorted([(s,p) for (p,s) in goal]):
888 logmsg.append("sh%d to [%s]" % (shnum, server.get_name()))
889 self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
890 self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
893 def update_goal(self):
894 # if log.recording_noisy
896 self.log_goal(self.goal, "before update: ")
898 # first, remove any bad servers from our goal
899 self.goal = set([ (server, shnum)
900 for (server, shnum) in self.goal
901 if server not in self.bad_servers ])
903 # find the homeless shares:
904 homefull_shares = set([shnum for (server, shnum) in self.goal])
905 homeless_shares = set(range(self.total_shares)) - homefull_shares
906 homeless_shares = sorted(list(homeless_shares))
907 # place them somewhere. We prefer unused servers at the beginning of
908 # the available server list.
910 if not homeless_shares:
913 # if an old share X is on a node, put the new share X there too.
914 # TODO: 1: redistribute shares to achieve one-per-server, by copying
915 # shares from existing servers to new (less-crowded) ones. The
916 # old shares must still be updated.
917 # TODO: 2: move those shares instead of copying them, to reduce future
920 # this is a bit CPU intensive but easy to analyze. We create a sort
921 # order for each server. If the server is marked as bad, we don't
922 # even put them in the list. Then we care about the number of shares
923 # which have already been assigned to them. After that we care about
924 # their permutation order.
925 old_assignments = DictOfSets()
926 for (server, shnum) in self.goal:
927 old_assignments.add(server, shnum)
930 for i, server in enumerate(self.full_serverlist):
931 serverid = server.get_serverid()
932 if server in self.bad_servers:
934 entry = (len(old_assignments.get(server, [])), i, serverid, server)
935 serverlist.append(entry)
939 raise NotEnoughServersError("Ran out of non-bad servers, "
941 str(self._first_write_error),
942 self._first_write_error)
944 # we then index this serverlist with an integer, because we may have
945 # to wrap. We update the goal as we go.
947 for shnum in homeless_shares:
948 (ignored1, ignored2, ignored3, server) = serverlist[i]
949 # if we are forced to send a share to a server that already has
950 # one, we may have two write requests in flight, and the
951 # servermap (which was computed before either request was sent)
952 # won't reflect the new shares, so the second response will be
953 # surprising. There is code in _got_write_answer() to tolerate
954 # this, otherwise it would cause the publish to fail with an
955 # UncoordinatedWriteError. See #546 for details of the trouble
956 # this used to cause.
957 self.goal.add( (server, shnum) )
959 if i >= len(serverlist):
962 self.log_goal(self.goal, "after update: ")
965 def _got_write_answer(self, answer, writer, started):
967 # SDMF writers only pretend to write when readers set their
968 # blocks, salts, and so on -- they actually just write once,
969 # at the end of the upload process. In fake writes, they
970 # return defer.succeed(None). If we see that, we shouldn't
971 # bother checking it.
974 server = writer.server
975 lp = self.log("_got_write_answer from %s, share %d" %
976 (server.get_name(), writer.shnum))
979 elapsed = now - started
981 self._status.add_per_server_time(server, elapsed)
983 wrote, read_data = answer
985 surprise_shares = set(read_data.keys()) - set([writer.shnum])
987 # We need to remove from surprise_shares any shares that we are
988 # knowingly also writing to that server from other writers.
990 # TODO: Precompute this.
991 known_shnums = [x.shnum for x in self.writers.values()
992 if x.server == server]
993 surprise_shares -= set(known_shnums)
994 self.log("found the following surprise shares: %s" %
995 str(surprise_shares))
997 # Now surprise shares contains all of the shares that we did not
998 # expect to be there.
1001 for shnum in surprise_shares:
1002 # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
1003 checkstring = read_data[shnum][0]
1004 # What we want to do here is to see if their (seqnum,
1005 # roothash, salt) is the same as our (seqnum, roothash,
1006 # salt), or the equivalent for MDMF. The best way to do this
1007 # is to store a packed representation of our checkstring
1008 # somewhere, then not bother unpacking the other
1010 if checkstring == self._checkstring:
1011 # they have the right share, somehow
1013 if (server,shnum) in self.goal:
1014 # and we want them to have it, so we probably sent them a
1015 # copy in an earlier write. This is ok, and avoids the
1019 # They aren't in our goal, but they are still for the right
1020 # version. Somebody else wrote them, and it's a convergent
1021 # uncoordinated write. Pretend this is ok (don't be
1022 # surprised), since I suspect there's a decent chance that
1023 # we'll hit this in normal operation.
1027 # the new shares are of a different version
1028 if server in self._servermap.get_reachable_servers():
1029 # we asked them about their shares, so we had knowledge
1030 # of what they used to have. Any surprising shares must
1031 # have come from someone else, so UCW.
1034 # we didn't ask them, and now we've discovered that they
1035 # have a share we didn't know about. This indicates that
1036 # mapupdate should have wokred harder and asked more
1037 # servers before concluding that it knew about them all.
1039 # signal UCW, but make sure to ask this server next time,
1040 # so we'll remember to update it if/when we retry.
1042 # TODO: ask this server next time. I don't yet have a good
1043 # way to do this. Two insufficient possibilities are:
1045 # self._servermap.add_new_share(server, shnum, verinfo, now)
1046 # but that requires fetching/validating/parsing the whole
1047 # version string, and all we have is the checkstring
1048 # self._servermap.mark_bad_share(server, shnum, checkstring)
1049 # that will make publish overwrite the share next time,
1050 # but it won't re-query the server, and it won't make
1051 # mapupdate search further
1053 # TODO later: when publish starts, do
1054 # servermap.get_best_version(), extract the seqnum,
1055 # subtract one, and store as highest-replaceable-seqnum.
1056 # Then, if this surprise-because-we-didn't-ask share is
1057 # of highest-replaceable-seqnum or lower, we're allowed
1058 # to replace it: send out a new writev (or rather add it
1059 # to self.goal and loop).
1065 self.log("they had shares %s that we didn't know about" %
1066 (list(surprise_shares),),
1067 parent=lp, level=log.WEIRD, umid="un9CSQ")
1068 self.surprised = True
1071 # TODO: there are two possibilities. The first is that the server
1072 # is full (or just doesn't want to give us any room), which means
1073 # we shouldn't ask them again, but is *not* an indication of an
1074 # uncoordinated write. The second is that our testv failed, which
1075 # *does* indicate an uncoordinated write. We currently don't have
1076 # a way to tell these two apart (in fact, the storage server code
1077 # doesn't have the option of refusing our share).
1079 # If the server is full, mark the server as bad (so we don't ask
1080 # them again), but don't set self.surprised. The loop() will find
1083 # If the testv failed, log it, set self.surprised, but don't
1084 # bother adding to self.bad_servers .
1086 self.log("our testv failed, so the write did not happen",
1087 parent=lp, level=log.WEIRD, umid="8sc26g")
1088 self.surprised = True
1089 self.bad_servers.add(server) # don't ask them again
1090 # use the checkstring to add information to the log message
1091 unknown_format = False
1092 for (shnum,readv) in read_data.items():
1093 checkstring = readv[0]
1094 version = get_version_from_checkstring(checkstring)
1095 if version == MDMF_VERSION:
1097 other_roothash) = unpack_mdmf_checkstring(checkstring)
1098 elif version == SDMF_VERSION:
1101 other_IV) = unpack_sdmf_checkstring(checkstring)
1103 unknown_format = True
1104 expected_version = self._servermap.version_on_server(server,
1106 if expected_version:
1107 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1108 offsets_tuple) = expected_version
1109 msg = ("somebody modified the share on us:"
1110 " shnum=%d: I thought they had #%d:R=%s," %
1112 seqnum, base32.b2a(root_hash)[:4]))
1114 msg += (" but I don't know how to read share"
1115 " format %d" % version)
1117 msg += " but testv reported #%d:R=%s" % \
1118 (other_seqnum, other_roothash)
1119 self.log(msg, parent=lp, level=log.NOISY)
1120 # if expected_version==None, then we didn't expect to see a
1121 # share on that server, and the 'surprise_shares' clause
1122 # above will have logged it.
1125 # and update the servermap
1126 # self.versioninfo is set during the last phase of publishing.
1127 # If we get there, we know that responses correspond to placed
1128 # shares, and can safely execute these statements.
1129 if self.versioninfo:
1130 self.log("wrote successfully: adding new share to servermap")
1131 self._servermap.add_new_share(server, writer.shnum,
1132 self.versioninfo, started)
1133 self.placed.add( (server, writer.shnum) )
1134 self._update_status()
1135 # the next method in the deferred chain will check to see if
1136 # we're done and successful.
1141 if not self._running:
1143 self._running = False
1145 self._status.timings["total"] = now - self._started
1147 elapsed = now - self._started_pushing
1148 self._status.timings['push'] = elapsed
1150 self._status.set_active(False)
1151 self.log("Publish done, success")
1152 self._status.set_status("Finished")
1153 self._status.set_progress(1.0)
1154 # Get k and segsize, then give them to the caller.
1156 hints['segsize'] = self.segment_size
1157 hints['k'] = self.required_shares
1158 self._node.set_downloader_hints(hints)
1159 eventually(self.done_deferred.callback, None)
1161 def _failure(self, f=None):
1163 self._last_failure = f
1165 if not self.surprised:
1166 # We ran out of servers
1167 msg = "Publish ran out of good servers"
1168 if self._last_failure:
1169 msg += ", last failure was: %s" % str(self._last_failure)
1171 e = NotEnoughServersError(msg)
1174 # We ran into shares that we didn't recognize, which means
1175 # that we need to return an UncoordinatedWriteError.
1176 self.log("Publish failed with UncoordinatedWriteError")
1177 e = UncoordinatedWriteError()
1178 f = failure.Failure(e)
1179 eventually(self.done_deferred.callback, f)
1182 class MutableFileHandle:
1184 I am a mutable uploadable built around a filehandle-like object,
1185 usually either a StringIO instance or a handle to an actual file.
1187 implements(IMutableUploadable)
1189 def __init__(self, filehandle):
1190 # The filehandle is defined as a generally file-like object that
1191 # has these two methods. We don't care beyond that.
1192 assert hasattr(filehandle, "read")
1193 assert hasattr(filehandle, "close")
1195 self._filehandle = filehandle
1196 # We must start reading at the beginning of the file, or we risk
1197 # encountering errors when the data read does not match the size
1198 # reported to the uploader.
1199 self._filehandle.seek(0)
1201 # We have not yet read anything, so our position is 0.
1207 I return the amount of data in my filehandle.
1209 if not hasattr(self, "_size"):
1210 old_position = self._filehandle.tell()
1211 # Seek to the end of the file by seeking 0 bytes from the
1213 self._filehandle.seek(0, 2) # 2 == os.SEEK_END in 2.5+
1214 self._size = self._filehandle.tell()
1215 # Restore the previous position, in case this was called
1217 self._filehandle.seek(old_position)
1218 assert self._filehandle.tell() == old_position
1220 assert hasattr(self, "_size")
1226 I return the position of my read marker -- i.e., how much data I
1227 have already read and returned to callers.
1232 def read(self, length):
1234 I return some data (up to length bytes) from my filehandle.
1236 In most cases, I return length bytes, but sometimes I won't --
1237 for example, if I am asked to read beyond the end of a file, or
1240 results = self._filehandle.read(length)
1241 self._marker += len(results)
1247 I close the underlying filehandle. Any further operations on the
1248 filehandle fail at this point.
1250 self._filehandle.close()
1253 class MutableData(MutableFileHandle):
1255 I am a mutable uploadable built around a string, which I then cast
1256 into a StringIO and treat as a filehandle.
1259 def __init__(self, s):
1260 # Take a string and return a file-like uploadable.
1261 assert isinstance(s, str)
1263 MutableFileHandle.__init__(self, StringIO(s))
1266 class TransformingUploadable:
1268 I am an IMutableUploadable that wraps another IMutableUploadable,
1269 and some segments that are already on the grid. When I am called to
1270 read, I handle merging of boundary segments.
1272 implements(IMutableUploadable)
1275 def __init__(self, data, offset, segment_size, start, end):
1276 assert IMutableUploadable.providedBy(data)
1278 self._newdata = data
1279 self._offset = offset
1280 self._segment_size = segment_size
1284 self._read_marker = 0
1286 self._first_segment_offset = offset % segment_size
1288 num = self.log("TransformingUploadable: starting", parent=None)
1289 self._log_number = num
1290 self.log("got fso: %d" % self._first_segment_offset)
1291 self.log("got offset: %d" % self._offset)
1294 def log(self, *args, **kwargs):
1295 if 'parent' not in kwargs:
1296 kwargs['parent'] = self._log_number
1297 if "facility" not in kwargs:
1298 kwargs["facility"] = "tahoe.mutable.transforminguploadable"
1299 return log.msg(*args, **kwargs)
1303 return self._offset + self._newdata.get_size()
1306 def read(self, length):
1307 # We can get data from 3 sources here.
1308 # 1. The first of the segments provided to us.
1309 # 2. The data that we're replacing things with.
1310 # 3. The last of the segments provided to us.
1312 # are we in state 0?
1313 self.log("reading %d bytes" % length)
1316 old_data_length = self._first_segment_offset - self._read_marker
1317 if old_data_length > 0:
1318 if old_data_length > length:
1319 old_data_length = length
1320 self.log("returning %d bytes of old start data" % old_data_length)
1322 old_data_end = old_data_length + self._read_marker
1323 old_start_data = self._start[self._read_marker:old_data_end]
1324 length -= old_data_length
1326 # otherwise calculations later get screwed up.
1329 # Is there enough new data to satisfy this read? If not, we need
1330 # to pad the end of the data with data from our last segment.
1331 old_end_length = length - \
1332 (self._newdata.get_size() - self._newdata.pos())
1334 if old_end_length > 0:
1335 self.log("reading %d bytes of old end data" % old_end_length)
1337 # TODO: We're not explicitly checking for tail segment size
1338 # here. Is that a problem?
1339 old_data_offset = (length - old_end_length + \
1340 old_data_length) % self._segment_size
1341 self.log("reading at offset %d" % old_data_offset)
1342 old_end = old_data_offset + old_end_length
1343 old_end_data = self._end[old_data_offset:old_end]
1344 length -= old_end_length
1345 assert length == self._newdata.get_size() - self._newdata.pos()
1347 self.log("reading %d bytes of new data" % length)
1348 new_data = self._newdata.read(length)
1349 new_data = "".join(new_data)
1351 self._read_marker += len(old_start_data + new_data + old_end_data)
1353 return old_start_data + new_data + old_end_data