4 from StringIO import StringIO
5 from itertools import count
6 from zope.interface import implements
7 from twisted.internet import defer
8 from twisted.python import failure
9 from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
11 from allmydata.util import base32, hashutil, mathutil, log
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from foolscap.api import eventually, fireEventually
18 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, MODE_REPAIR, \
19 UncoordinatedWriteError, NotEnoughServersError
20 from allmydata.mutable.servermap import ServerMap
21 from allmydata.mutable.layout import get_version_from_checkstring,\
22 unpack_mdmf_checkstring, \
23 unpack_sdmf_checkstring, \
28 DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
29 PUSHING_BLOCKS_STATE = 0
30 PUSHING_EVERYTHING_ELSE_STATE = 1
34 implements(IPublishStatus)
35 statusid_counter = count(0)
38 self.timings["send_per_server"] = {}
39 self.timings["encrypt"] = 0.0
40 self.timings["encode"] = 0.0
44 self.storage_index = None
46 self.encoding = ("?", "?")
48 self.status = "Not started"
50 self.counter = self.statusid_counter.next()
51 self.started = time.time()
53 def add_per_server_time(self, server, elapsed):
54 if server not in self.timings["send_per_server"]:
55 self.timings["send_per_server"][server] = []
56 self.timings["send_per_server"][server].append(elapsed)
57 def accumulate_encode_time(self, elapsed):
58 self.timings["encode"] += elapsed
59 def accumulate_encrypt_time(self, elapsed):
60 self.timings["encrypt"] += elapsed
62 def get_started(self):
64 def get_storage_index(self):
65 return self.storage_index
66 def get_encoding(self):
68 def using_helper(self):
70 def get_servermap(self):
76 def get_progress(self):
80 def get_counter(self):
82 def get_problems(self):
85 def set_storage_index(self, si):
86 self.storage_index = si
87 def set_helper(self, helper):
89 def set_servermap(self, servermap):
90 self.servermap = servermap
91 def set_encoding(self, k, n):
92 self.encoding = (k, n)
93 def set_size(self, size):
95 def set_status(self, status):
97 def set_progress(self, value):
99 def set_active(self, value):
102 class LoopLimitExceededError(Exception):
106 """I represent a single act of publishing the mutable file to the grid. I
107 will only publish my data if the servermap I am using still represents
108 the current state of the world.
110 To make the initial publish, set servermap to None.
113 def __init__(self, filenode, storage_broker, servermap):
114 self._node = filenode
115 self._storage_broker = storage_broker
116 self._servermap = servermap
117 self._storage_index = self._node.get_storage_index()
118 self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
119 num = self.log("Publish(%s): starting" % prefix, parent=None)
120 self._log_number = num
122 self._first_write_error = None
123 self._last_failure = None
125 self._status = PublishStatus()
126 self._status.set_storage_index(self._storage_index)
127 self._status.set_helper(False)
128 self._status.set_progress(0.0)
129 self._status.set_active(True)
130 self._version = self._node.get_version()
131 assert self._version in (SDMF_VERSION, MDMF_VERSION)
134 def get_status(self):
137 def log(self, *args, **kwargs):
138 if 'parent' not in kwargs:
139 kwargs['parent'] = self._log_number
140 if "facility" not in kwargs:
141 kwargs["facility"] = "tahoe.mutable.publish"
142 return log.msg(*args, **kwargs)
145 def update(self, data, offset, blockhashes, version):
147 I replace the contents of this file with the contents of data,
148 starting at offset. I return a Deferred that fires with None
149 when the replacement has been completed, or with an error if
150 something went wrong during the process.
152 Note that this process will not upload new shares. If the file
153 being updated is in need of repair, callers will have to repair
157 # 1: Make server assignments. We'll assign each share that we know
158 # about on the grid to that server that currently holds that
159 # share, and will not place any new shares.
160 # 2: Setup encoding parameters. Most of these will stay the same
161 # -- datalength will change, as will some of the offsets.
162 # 3. Upload the new segments.
164 assert IMutableUploadable.providedBy(data)
168 # XXX: Use the MutableFileVersion instead.
169 self.datalength = self._node.get_size()
170 if data.get_size() > self.datalength:
171 self.datalength = data.get_size()
173 self.log("starting update")
174 self.log("adding new data of length %d at offset %d" % \
175 (data.get_size(), offset))
176 self.log("new data length is %d" % self.datalength)
177 self._status.set_size(self.datalength)
178 self._status.set_status("Started")
179 self._started = time.time()
181 self.done_deferred = defer.Deferred()
183 self._writekey = self._node.get_writekey()
184 assert self._writekey, "need write capability to publish"
186 # first, which servers will we publish to? We require that the
187 # servermap was updated in MODE_WRITE, so we can depend upon the
188 # serverlist computed by that process instead of computing our own.
189 assert self._servermap
190 assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
191 # we will push a version that is one larger than anything present
192 # in the grid, according to the servermap.
193 self._new_seqnum = self._servermap.highest_seqnum() + 1
194 self._status.set_servermap(self._servermap)
196 self.log(format="new seqnum will be %(seqnum)d",
197 seqnum=self._new_seqnum, level=log.NOISY)
199 # We're updating an existing file, so all of the following
200 # should be available.
201 self.readkey = self._node.get_readkey()
202 self.required_shares = self._node.get_required_shares()
203 assert self.required_shares is not None
204 self.total_shares = self._node.get_total_shares()
205 assert self.total_shares is not None
206 self._status.set_encoding(self.required_shares, self.total_shares)
208 self._pubkey = self._node.get_pubkey()
210 self._privkey = self._node.get_privkey()
212 self._encprivkey = self._node.get_encprivkey()
214 sb = self._storage_broker
215 full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
216 self.full_serverlist = full_serverlist # for use later, immutable
217 self.bad_servers = set() # servers who have errbacked/refused requests
219 # This will set self.segment_size, self.num_segments, and
220 # self.fec. TODO: Does it know how to do the offset? Probably
221 # not. So do that part next.
222 self.setup_encoding_parameters(offset=offset)
224 # if we experience any surprises (writes which were rejected because
225 # our test vector did not match, or shares which we didn't expect to
226 # see), we set this flag and report an UncoordinatedWriteError at the
227 # end of the publish process.
228 self.surprised = False
230 # we keep track of three tables. The first is our goal: which share
231 # we want to see on which servers. This is initially populated by the
232 # existing servermap.
233 self.goal = set() # pairs of (server, shnum) tuples
235 # the number of outstanding queries: those that are in flight and
236 # may or may not be delivered, accepted, or acknowledged. This is
237 # incremented when a query is sent, and decremented when the response
238 # returns or errbacks.
239 self.num_outstanding = 0
241 # the third is a table of successes: share which have actually been
242 # placed. These are populated when responses come back with success.
243 # When self.placed == self.goal, we're done.
244 self.placed = set() # (server, shnum) tuples
246 self.bad_share_checkstrings = {}
248 # This is set at the last step of the publishing process.
249 self.versioninfo = ""
251 # we use the servermap to populate the initial goal: this way we will
252 # try to update each existing share in place. Since we're
253 # updating, we ignore damaged and missing shares -- callers must
254 # do a repair to repair and recreate these.
255 self.goal = set(self._servermap.get_known_shares())
257 # shnum -> set of IMutableSlotWriter
258 self.writers = DictOfSets()
260 # SDMF files are updated differently.
261 self._version = MDMF_VERSION
262 writer_class = MDMFSlotWriteProxy
264 # For each (server, shnum) in self.goal, we make a
265 # write proxy for that server. We'll use this to write
266 # shares to the server.
267 for (server,shnum) in self.goal:
268 write_enabler = self._node.get_write_enabler(server)
269 renew_secret = self._node.get_renewal_secret(server)
270 cancel_secret = self._node.get_cancel_secret(server)
271 secrets = (write_enabler, renew_secret, cancel_secret)
273 writer = writer_class(shnum,
278 self.required_shares,
283 self.writers.add(shnum, writer)
284 writer.server = server
285 known_shares = self._servermap.get_known_shares()
286 assert (server, shnum) in known_shares
287 old_versionid, old_timestamp = known_shares[(server,shnum)]
288 (old_seqnum, old_root_hash, old_salt, old_segsize,
289 old_datalength, old_k, old_N, old_prefix,
290 old_offsets_tuple) = old_versionid
291 writer.set_checkstring(old_seqnum,
295 # Our remote shares will not have a complete checkstring until
296 # after we are done writing share data and have started to write
297 # blocks. In the meantime, we need to know what to look for when
298 # writing, so that we can detect UncoordinatedWriteErrors.
299 self._checkstring = self._get_some_writer().get_checkstring()
301 # Now, we start pushing shares.
302 self._status.timings["setup"] = time.time() - self._started
303 # First, we encrypt, encode, and publish the shares that we need
304 # to encrypt, encode, and publish.
306 # Our update process fetched these for us. We need to update
307 # them in place as publishing happens.
308 self.blockhashes = {} # (shnum, [blochashes])
309 for (i, bht) in blockhashes.iteritems():
310 # We need to extract the leaves from our old hash tree.
311 old_segcount = mathutil.div_ceil(version[4],
313 h = hashtree.IncompleteHashTree(old_segcount)
314 bht = dict(enumerate(bht))
316 leaves = h[h.get_leaf_index(0):]
317 for j in xrange(self.num_segments - len(leaves)):
320 assert len(leaves) >= self.num_segments
321 self.blockhashes[i] = leaves
322 # This list will now be the leaves that were set during the
323 # initial upload + enough empty hashes to make it a
324 # power-of-two. If we exceed a power of two boundary, we
325 # should be encoding the file over again, and should not be
327 #assert len(self.blockhashes[i]) == \
328 # hashtree.roundup_pow2(self.num_segments), \
329 # len(self.blockhashes[i])
330 # XXX: Except this doesn't work. Figure out why.
332 # These are filled in later, after we've modified the block hash
334 self.sharehash_leaves = None # eventually [sharehashes]
335 self.sharehashes = {} # shnum -> [sharehash leaves necessary to
336 # validate the share]
338 self.log("Starting push")
340 self._state = PUSHING_BLOCKS_STATE
343 return self.done_deferred
346 def publish(self, newdata):
347 """Publish the filenode's current contents. Returns a Deferred that
348 fires (with None) when the publish has done as much work as it's ever
349 going to do, or errbacks with ConsistencyError if it detects a
353 # 0. Setup encoding parameters, encoder, and other such things.
354 # 1. Encrypt, encode, and publish segments.
355 assert IMutableUploadable.providedBy(newdata)
358 self.datalength = newdata.get_size()
359 #if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
360 # self._version = MDMF_VERSION
362 # self._version = SDMF_VERSION
364 self.log("starting publish, datalen is %s" % self.datalength)
365 self._status.set_size(self.datalength)
366 self._status.set_status("Started")
367 self._started = time.time()
369 self.done_deferred = defer.Deferred()
371 self._writekey = self._node.get_writekey()
372 assert self._writekey, "need write capability to publish"
374 # first, which servers will we publish to? We require that the
375 # servermap was updated in MODE_WRITE, so we can depend upon the
376 # serverlist computed by that process instead of computing our own.
378 assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
379 # we will push a version that is one larger than anything present
380 # in the grid, according to the servermap.
381 self._new_seqnum = self._servermap.highest_seqnum() + 1
383 # If we don't have a servermap, that's because we're doing the
386 self._servermap = ServerMap()
387 self._status.set_servermap(self._servermap)
389 self.log(format="new seqnum will be %(seqnum)d",
390 seqnum=self._new_seqnum, level=log.NOISY)
392 # having an up-to-date servermap (or using a filenode that was just
393 # created for the first time) also guarantees that the following
394 # fields are available
395 self.readkey = self._node.get_readkey()
396 self.required_shares = self._node.get_required_shares()
397 assert self.required_shares is not None
398 self.total_shares = self._node.get_total_shares()
399 assert self.total_shares is not None
400 self._status.set_encoding(self.required_shares, self.total_shares)
402 self._pubkey = self._node.get_pubkey()
404 self._privkey = self._node.get_privkey()
406 self._encprivkey = self._node.get_encprivkey()
408 sb = self._storage_broker
409 full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
410 self.full_serverlist = full_serverlist # for use later, immutable
411 self.bad_servers = set() # servers who have errbacked/refused requests
413 # This will set self.segment_size, self.num_segments, and
415 self.setup_encoding_parameters()
417 # if we experience any surprises (writes which were rejected because
418 # our test vector did not match, or shares which we didn't expect to
419 # see), we set this flag and report an UncoordinatedWriteError at the
420 # end of the publish process.
421 self.surprised = False
423 # we keep track of three tables. The first is our goal: which share
424 # we want to see on which servers. This is initially populated by the
425 # existing servermap.
426 self.goal = set() # pairs of (server, shnum) tuples
428 # the number of outstanding queries: those that are in flight and
429 # may or may not be delivered, accepted, or acknowledged. This is
430 # incremented when a query is sent, and decremented when the response
431 # returns or errbacks.
432 self.num_outstanding = 0
434 # the third is a table of successes: share which have actually been
435 # placed. These are populated when responses come back with success.
436 # When self.placed == self.goal, we're done.
437 self.placed = set() # (server, shnum) tuples
439 self.bad_share_checkstrings = {}
441 # This is set at the last step of the publishing process.
442 self.versioninfo = ""
444 # we use the servermap to populate the initial goal: this way we will
445 # try to update each existing share in place.
446 self.goal = set(self._servermap.get_known_shares())
448 # then we add in all the shares that were bad (corrupted, bad
449 # signatures, etc). We want to replace these.
450 for key, old_checkstring in self._servermap.get_bad_shares().items():
451 (server, shnum) = key
452 self.goal.add( (server,shnum) )
453 self.bad_share_checkstrings[(server,shnum)] = old_checkstring
455 # TODO: Make this part do server selection.
458 # shnum -> set of IMutableSlotWriter
459 self.writers = DictOfSets()
461 if self._version == MDMF_VERSION:
462 writer_class = MDMFSlotWriteProxy
464 writer_class = SDMFSlotWriteProxy
466 # For each (server, shnum) in self.goal, we make a
467 # write proxy for that server. We'll use this to write
468 # shares to the server.
469 for (server,shnum) in self.goal:
470 write_enabler = self._node.get_write_enabler(server)
471 renew_secret = self._node.get_renewal_secret(server)
472 cancel_secret = self._node.get_cancel_secret(server)
473 secrets = (write_enabler, renew_secret, cancel_secret)
475 writer = writer_class(shnum,
480 self.required_shares,
484 self.writers.add(shnum, writer)
485 writer.server = server
486 known_shares = self._servermap.get_known_shares()
487 if (server, shnum) in known_shares:
488 old_versionid, old_timestamp = known_shares[(server,shnum)]
489 (old_seqnum, old_root_hash, old_salt, old_segsize,
490 old_datalength, old_k, old_N, old_prefix,
491 old_offsets_tuple) = old_versionid
492 writer.set_checkstring(old_seqnum,
495 elif (server, shnum) in self.bad_share_checkstrings:
496 old_checkstring = self.bad_share_checkstrings[(server, shnum)]
497 writer.set_checkstring(old_checkstring)
499 # Our remote shares will not have a complete checkstring until
500 # after we are done writing share data and have started to write
501 # blocks. In the meantime, we need to know what to look for when
502 # writing, so that we can detect UncoordinatedWriteErrors.
503 self._checkstring = self._get_some_writer().get_checkstring()
505 # Now, we start pushing shares.
506 self._status.timings["setup"] = time.time() - self._started
507 # First, we encrypt, encode, and publish the shares that we need
508 # to encrypt, encode, and publish.
510 # This will eventually hold the block hash chain for each share
511 # that we publish. We define it this way so that empty publishes
512 # will still have something to write to the remote slot.
513 self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
514 for i in xrange(self.total_shares):
515 blocks = self.blockhashes[i]
516 for j in xrange(self.num_segments):
518 self.sharehash_leaves = None # eventually [sharehashes]
519 self.sharehashes = {} # shnum -> [sharehash leaves necessary to
520 # validate the share]
522 self.log("Starting push")
524 self._state = PUSHING_BLOCKS_STATE
527 return self.done_deferred
529 def _get_some_writer(self):
530 return list(self.writers.values()[0])[0]
532 def _update_status(self):
533 self._status.set_status("Sending Shares: %d placed out of %d, "
534 "%d messages outstanding" %
537 self.num_outstanding))
538 self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
541 def setup_encoding_parameters(self, offset=0):
542 if self._version == MDMF_VERSION:
543 segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
545 segment_size = self.datalength # SDMF is only one segment
546 # this must be a multiple of self.required_shares
547 segment_size = mathutil.next_multiple(segment_size,
548 self.required_shares)
549 self.segment_size = segment_size
551 # Calculate the starting segment for the upload.
553 # We use div_ceil instead of integer division here because
554 # it is semantically correct.
555 # If datalength isn't an even multiple of segment_size, but
556 # is larger than segment_size, datalength // segment_size
557 # will be the largest number such that num <= datalength and
558 # num % segment_size == 0. But that's not what we want,
559 # because it ignores the extra data. div_ceil will give us
560 # the right number of segments for the data that we're
562 self.num_segments = mathutil.div_ceil(self.datalength,
565 self.starting_segment = offset // segment_size
568 self.num_segments = 0
569 self.starting_segment = 0
572 self.log("building encoding parameters for file")
573 self.log("got segsize %d" % self.segment_size)
574 self.log("got %d segments" % self.num_segments)
576 if self._version == SDMF_VERSION:
577 assert self.num_segments in (0, 1) # SDMF
578 # calculate the tail segment size.
580 if segment_size and self.datalength:
581 self.tail_segment_size = self.datalength % segment_size
582 self.log("got tail segment size %d" % self.tail_segment_size)
584 self.tail_segment_size = 0
586 if self.tail_segment_size == 0 and segment_size:
587 # The tail segment is the same size as the other segments.
588 self.tail_segment_size = segment_size
591 fec = codec.CRSEncoder()
592 fec.set_params(self.segment_size,
593 self.required_shares, self.total_shares)
594 self.piece_size = fec.get_block_size()
597 if self.tail_segment_size == self.segment_size:
598 self.tail_fec = self.fec
600 tail_fec = codec.CRSEncoder()
601 tail_fec.set_params(self.tail_segment_size,
602 self.required_shares,
604 self.tail_fec = tail_fec
606 self._current_segment = self.starting_segment
607 self.end_segment = self.num_segments - 1
608 # Now figure out where the last segment should be.
609 if self.data.get_size() != self.datalength:
610 # We're updating a few segments in the middle of a mutable
611 # file, so we don't want to republish the whole thing.
612 # (we don't have enough data to do that even if we wanted
614 end = self.data.get_size()
615 self.end_segment = end // segment_size
616 if end % segment_size == 0:
617 self.end_segment -= 1
619 self.log("got start segment %d" % self.starting_segment)
620 self.log("got end segment %d" % self.end_segment)
623 def _push(self, ignored=None):
625 I manage state transitions. In particular, I see that we still
626 have a good enough number of writers to complete the upload
629 # Can we still successfully publish this file?
630 # TODO: Keep track of outstanding queries before aborting the
632 num_shnums = len(self.writers)
633 if num_shnums < self.required_shares or self.surprised:
634 return self._failure()
636 # Figure out what we need to do next. Each of these needs to
637 # return a deferred so that we don't block execution when this
638 # is first called in the upload method.
639 if self._state == PUSHING_BLOCKS_STATE:
640 return self.push_segment(self._current_segment)
642 elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
643 return self.push_everything_else()
645 # If we make it to this point, we were successful in placing the
650 def push_segment(self, segnum):
651 if self.num_segments == 0 and self._version == SDMF_VERSION:
652 self._add_dummy_salts()
654 if segnum > self.end_segment:
655 # We don't have any more segments to push.
656 self._state = PUSHING_EVERYTHING_ELSE_STATE
659 d = self._encode_segment(segnum)
660 d.addCallback(self._push_segment, segnum)
661 def _increment_segnum(ign):
662 self._current_segment += 1
663 # XXX: I don't think we need to do addBoth here -- any errBacks
664 # should be handled within push_segment.
665 d.addCallback(_increment_segnum)
666 d.addCallback(self._turn_barrier)
667 d.addCallback(self._push)
668 d.addErrback(self._failure)
671 def _turn_barrier(self, result):
673 I help the publish process avoid the recursion limit issues
676 return fireEventually(result)
679 def _add_dummy_salts(self):
681 SDMF files need a salt even if they're empty, or the signature
682 won't make sense. This method adds a dummy salt to each of our
683 SDMF writers so that they can write the signature later.
685 salt = os.urandom(16)
686 assert self._version == SDMF_VERSION
688 for shnum, writers in self.writers.iteritems():
689 for writer in writers:
690 writer.put_salt(salt)
693 def _encode_segment(self, segnum):
695 I encrypt and encode the segment segnum.
697 started = time.time()
699 if segnum + 1 == self.num_segments:
700 segsize = self.tail_segment_size
702 segsize = self.segment_size
705 self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
706 data = self.data.read(segsize)
707 # XXX: This is dumb. Why return a list?
710 assert len(data) == segsize, len(data)
712 salt = os.urandom(16)
714 key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
715 self._status.set_status("Encrypting")
717 crypttext = enc.process(data)
718 assert len(crypttext) == len(data)
721 self._status.accumulate_encrypt_time(now - started)
725 if segnum + 1 == self.num_segments:
730 self._status.set_status("Encoding")
731 crypttext_pieces = [None] * self.required_shares
732 piece_size = fec.get_block_size()
733 for i in range(len(crypttext_pieces)):
734 offset = i * piece_size
735 piece = crypttext[offset:offset+piece_size]
736 piece = piece + "\x00"*(piece_size - len(piece)) # padding
737 crypttext_pieces[i] = piece
738 assert len(piece) == piece_size
739 d = fec.encode(crypttext_pieces)
740 def _done_encoding(res):
741 elapsed = time.time() - started
742 self._status.accumulate_encode_time(elapsed)
744 d.addCallback(_done_encoding)
748 def _push_segment(self, encoded_and_salt, segnum):
750 I push (data, salt) as segment number segnum.
752 results, salt = encoded_and_salt
753 shares, shareids = results
754 self._status.set_status("Pushing segment")
755 for i in xrange(len(shares)):
756 sharedata = shares[i]
757 shareid = shareids[i]
758 if self._version == MDMF_VERSION:
759 hashed = salt + sharedata
762 block_hash = hashutil.block_hash(hashed)
763 self.blockhashes[shareid][segnum] = block_hash
764 # find the writer for this share
765 writers = self.writers[shareid]
766 for writer in writers:
767 writer.put_block(sharedata, segnum, salt)
770 def push_everything_else(self):
772 I put everything else associated with a share.
774 self._pack_started = time.time()
775 self.push_encprivkey()
776 self.push_blockhashes()
777 self.push_sharehashes()
778 self.push_toplevel_hashes_and_signature()
779 d = self.finish_publishing()
780 def _change_state(ignored):
781 self._state = DONE_STATE
782 d.addCallback(_change_state)
783 d.addCallback(self._push)
787 def push_encprivkey(self):
788 encprivkey = self._encprivkey
789 self._status.set_status("Pushing encrypted private key")
790 for shnum, writers in self.writers.iteritems():
791 for writer in writers:
792 writer.put_encprivkey(encprivkey)
795 def push_blockhashes(self):
796 self.sharehash_leaves = [None] * len(self.blockhashes)
797 self._status.set_status("Building and pushing block hash tree")
798 for shnum, blockhashes in self.blockhashes.iteritems():
799 t = hashtree.HashTree(blockhashes)
800 self.blockhashes[shnum] = list(t)
801 # set the leaf for future use.
802 self.sharehash_leaves[shnum] = t[0]
804 writers = self.writers[shnum]
805 for writer in writers:
806 writer.put_blockhashes(self.blockhashes[shnum])
809 def push_sharehashes(self):
810 self._status.set_status("Building and pushing share hash chain")
811 share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
812 for shnum in xrange(len(self.sharehash_leaves)):
813 needed_indices = share_hash_tree.needed_hashes(shnum)
814 self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
815 for i in needed_indices] )
816 writers = self.writers[shnum]
817 for writer in writers:
818 writer.put_sharehashes(self.sharehashes[shnum])
819 self.root_hash = share_hash_tree[0]
822 def push_toplevel_hashes_and_signature(self):
823 # We need to to three things here:
824 # - Push the root hash and salt hash
825 # - Get the checkstring of the resulting layout; sign that.
826 # - Push the signature
827 self._status.set_status("Pushing root hashes and signature")
828 for shnum in xrange(self.total_shares):
829 writers = self.writers[shnum]
830 for writer in writers:
831 writer.put_root_hash(self.root_hash)
832 self._update_checkstring()
833 self._make_and_place_signature()
836 def _update_checkstring(self):
838 After putting the root hash, MDMF files will have the
839 checkstring written to the storage server. This means that we
840 can update our copy of the checkstring so we can detect
841 uncoordinated writes. SDMF files will have the same checkstring,
842 so we need not do anything.
844 self._checkstring = self._get_some_writer().get_checkstring()
847 def _make_and_place_signature(self):
849 I create and place the signature.
851 started = time.time()
852 self._status.set_status("Signing prefix")
853 signable = self._get_some_writer().get_signable()
854 self.signature = self._privkey.sign(signable)
856 for (shnum, writers) in self.writers.iteritems():
857 for writer in writers:
858 writer.put_signature(self.signature)
859 self._status.timings['sign'] = time.time() - started
862 def finish_publishing(self):
863 # We're almost done -- we just need to put the verification key
865 started = time.time()
866 self._status.set_status("Pushing shares")
867 self._started_pushing = started
869 verification_key = self._pubkey.serialize()
871 for (shnum, writers) in self.writers.copy().iteritems():
872 for writer in writers:
873 writer.put_verification_key(verification_key)
874 self.num_outstanding += 1
875 def _no_longer_outstanding(res):
876 self.num_outstanding -= 1
879 d = writer.finish_publishing()
880 d.addBoth(_no_longer_outstanding)
881 d.addErrback(self._connection_problem, writer)
882 d.addCallback(self._got_write_answer, writer, started)
884 self._record_verinfo()
885 self._status.timings['pack'] = time.time() - started
886 return defer.DeferredList(ds)
889 def _record_verinfo(self):
890 self.versioninfo = self._get_some_writer().get_verinfo()
893 def _connection_problem(self, f, writer):
895 We ran into a connection problem while working with writer, and
896 need to deal with that.
898 self.log("found problem: %s" % str(f))
899 self._last_failure = f
900 self.writers.discard(writer.shnum, writer)
903 def log_goal(self, goal, message=""):
905 for (shnum, server) in sorted([(s,p) for (p,s) in goal]):
906 logmsg.append("sh%d to [%s]" % (shnum, server.get_name()))
907 self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
908 self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
911 def update_goal(self):
912 # if log.recording_noisy
914 self.log_goal(self.goal, "before update: ")
916 # first, remove any bad servers from our goal
917 self.goal = set([ (server, shnum)
918 for (server, shnum) in self.goal
919 if server not in self.bad_servers ])
921 # find the homeless shares:
922 homefull_shares = set([shnum for (server, shnum) in self.goal])
923 homeless_shares = set(range(self.total_shares)) - homefull_shares
924 homeless_shares = sorted(list(homeless_shares))
925 # place them somewhere. We prefer unused servers at the beginning of
926 # the available server list.
928 if not homeless_shares:
931 # if an old share X is on a node, put the new share X there too.
932 # TODO: 1: redistribute shares to achieve one-per-server, by copying
933 # shares from existing servers to new (less-crowded) ones. The
934 # old shares must still be updated.
935 # TODO: 2: move those shares instead of copying them, to reduce future
938 # this is a bit CPU intensive but easy to analyze. We create a sort
939 # order for each server. If the server is marked as bad, we don't
940 # even put them in the list. Then we care about the number of shares
941 # which have already been assigned to them. After that we care about
942 # their permutation order.
943 old_assignments = DictOfSets()
944 for (server, shnum) in self.goal:
945 old_assignments.add(server, shnum)
948 for i, server in enumerate(self.full_serverlist):
949 serverid = server.get_serverid()
950 if server in self.bad_servers:
952 entry = (len(old_assignments.get(server, [])), i, serverid, server)
953 serverlist.append(entry)
957 raise NotEnoughServersError("Ran out of non-bad servers, "
959 str(self._first_write_error),
960 self._first_write_error)
962 # we then index this serverlist with an integer, because we may have
963 # to wrap. We update the goal as we go.
965 for shnum in homeless_shares:
966 (ignored1, ignored2, ignored3, server) = serverlist[i]
967 # if we are forced to send a share to a server that already has
968 # one, we may have two write requests in flight, and the
969 # servermap (which was computed before either request was sent)
970 # won't reflect the new shares, so the second response will be
971 # surprising. There is code in _got_write_answer() to tolerate
972 # this, otherwise it would cause the publish to fail with an
973 # UncoordinatedWriteError. See #546 for details of the trouble
974 # this used to cause.
975 self.goal.add( (server, shnum) )
977 if i >= len(serverlist):
980 self.log_goal(self.goal, "after update: ")
983 def _got_write_answer(self, answer, writer, started):
985 # SDMF writers only pretend to write when readers set their
986 # blocks, salts, and so on -- they actually just write once,
987 # at the end of the upload process. In fake writes, they
988 # return defer.succeed(None). If we see that, we shouldn't
989 # bother checking it.
992 server = writer.server
993 lp = self.log("_got_write_answer from %s, share %d" %
994 (server.get_name(), writer.shnum))
997 elapsed = now - started
999 self._status.add_per_server_time(server, elapsed)
1001 wrote, read_data = answer
1003 surprise_shares = set(read_data.keys()) - set([writer.shnum])
1005 # We need to remove from surprise_shares any shares that we are
1006 # knowingly also writing to that server from other writers.
1008 # TODO: Precompute this.
1010 for shnum, writers in self.writers.iteritems():
1011 shares.extend([x.shnum for x in writers if x.server == server])
1012 known_shnums = set(shares)
1013 surprise_shares -= known_shnums
1014 self.log("found the following surprise shares: %s" %
1015 str(surprise_shares))
1017 # Now surprise shares contains all of the shares that we did not
1018 # expect to be there.
1021 for shnum in surprise_shares:
1022 # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
1023 checkstring = read_data[shnum][0]
1024 # What we want to do here is to see if their (seqnum,
1025 # roothash, salt) is the same as our (seqnum, roothash,
1026 # salt), or the equivalent for MDMF. The best way to do this
1027 # is to store a packed representation of our checkstring
1028 # somewhere, then not bother unpacking the other
1030 if checkstring == self._checkstring:
1031 # they have the right share, somehow
1033 if (server,shnum) in self.goal:
1034 # and we want them to have it, so we probably sent them a
1035 # copy in an earlier write. This is ok, and avoids the
1039 # They aren't in our goal, but they are still for the right
1040 # version. Somebody else wrote them, and it's a convergent
1041 # uncoordinated write. Pretend this is ok (don't be
1042 # surprised), since I suspect there's a decent chance that
1043 # we'll hit this in normal operation.
1047 # the new shares are of a different version
1048 if server in self._servermap.get_reachable_servers():
1049 # we asked them about their shares, so we had knowledge
1050 # of what they used to have. Any surprising shares must
1051 # have come from someone else, so UCW.
1054 # we didn't ask them, and now we've discovered that they
1055 # have a share we didn't know about. This indicates that
1056 # mapupdate should have wokred harder and asked more
1057 # servers before concluding that it knew about them all.
1059 # signal UCW, but make sure to ask this server next time,
1060 # so we'll remember to update it if/when we retry.
1062 # TODO: ask this server next time. I don't yet have a good
1063 # way to do this. Two insufficient possibilities are:
1065 # self._servermap.add_new_share(server, shnum, verinfo, now)
1066 # but that requires fetching/validating/parsing the whole
1067 # version string, and all we have is the checkstring
1068 # self._servermap.mark_bad_share(server, shnum, checkstring)
1069 # that will make publish overwrite the share next time,
1070 # but it won't re-query the server, and it won't make
1071 # mapupdate search further
1073 # TODO later: when publish starts, do
1074 # servermap.get_best_version(), extract the seqnum,
1075 # subtract one, and store as highest-replaceable-seqnum.
1076 # Then, if this surprise-because-we-didn't-ask share is
1077 # of highest-replaceable-seqnum or lower, we're allowed
1078 # to replace it: send out a new writev (or rather add it
1079 # to self.goal and loop).
1085 self.log("they had shares %s that we didn't know about" %
1086 (list(surprise_shares),),
1087 parent=lp, level=log.WEIRD, umid="un9CSQ")
1088 self.surprised = True
1091 # TODO: there are two possibilities. The first is that the server
1092 # is full (or just doesn't want to give us any room), which means
1093 # we shouldn't ask them again, but is *not* an indication of an
1094 # uncoordinated write. The second is that our testv failed, which
1095 # *does* indicate an uncoordinated write. We currently don't have
1096 # a way to tell these two apart (in fact, the storage server code
1097 # doesn't have the option of refusing our share).
1099 # If the server is full, mark the server as bad (so we don't ask
1100 # them again), but don't set self.surprised. The loop() will find
1103 # If the testv failed, log it, set self.surprised, but don't
1104 # bother adding to self.bad_servers .
1106 self.log("our testv failed, so the write did not happen",
1107 parent=lp, level=log.WEIRD, umid="8sc26g")
1108 self.surprised = True
1109 self.bad_servers.add(server) # don't ask them again
1110 # use the checkstring to add information to the log message
1111 unknown_format = False
1112 for (shnum,readv) in read_data.items():
1113 checkstring = readv[0]
1114 version = get_version_from_checkstring(checkstring)
1115 if version == MDMF_VERSION:
1117 other_roothash) = unpack_mdmf_checkstring(checkstring)
1118 elif version == SDMF_VERSION:
1121 other_IV) = unpack_sdmf_checkstring(checkstring)
1123 unknown_format = True
1124 expected_version = self._servermap.version_on_server(server,
1126 if expected_version:
1127 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1128 offsets_tuple) = expected_version
1129 msg = ("somebody modified the share on us:"
1130 " shnum=%d: I thought they had #%d:R=%s," %
1132 seqnum, base32.b2a(root_hash)[:4]))
1134 msg += (" but I don't know how to read share"
1135 " format %d" % version)
1137 msg += " but testv reported #%d:R=%s" % \
1138 (other_seqnum, other_roothash)
1139 self.log(msg, parent=lp, level=log.NOISY)
1140 # if expected_version==None, then we didn't expect to see a
1141 # share on that server, and the 'surprise_shares' clause
1142 # above will have logged it.
1145 # and update the servermap
1146 # self.versioninfo is set during the last phase of publishing.
1147 # If we get there, we know that responses correspond to placed
1148 # shares, and can safely execute these statements.
1149 if self.versioninfo:
1150 self.log("wrote successfully: adding new share to servermap")
1151 self._servermap.add_new_share(server, writer.shnum,
1152 self.versioninfo, started)
1153 self.placed.add( (server, writer.shnum) )
1154 self._update_status()
1155 # the next method in the deferred chain will check to see if
1156 # we're done and successful.
1161 if not self._running:
1163 self._running = False
1165 self._status.timings["total"] = now - self._started
1167 elapsed = now - self._started_pushing
1168 self._status.timings['push'] = elapsed
1170 self._status.set_active(False)
1171 self.log("Publish done, success")
1172 self._status.set_status("Finished")
1173 self._status.set_progress(1.0)
1174 # Get k and segsize, then give them to the caller.
1176 hints['segsize'] = self.segment_size
1177 hints['k'] = self.required_shares
1178 self._node.set_downloader_hints(hints)
1179 eventually(self.done_deferred.callback, None)
1181 def _failure(self, f=None):
1183 self._last_failure = f
1185 if not self.surprised:
1186 # We ran out of servers
1187 msg = "Publish ran out of good servers"
1188 if self._last_failure:
1189 msg += ", last failure was: %s" % str(self._last_failure)
1191 e = NotEnoughServersError(msg)
1194 # We ran into shares that we didn't recognize, which means
1195 # that we need to return an UncoordinatedWriteError.
1196 self.log("Publish failed with UncoordinatedWriteError")
1197 e = UncoordinatedWriteError()
1198 f = failure.Failure(e)
1199 eventually(self.done_deferred.callback, f)
1202 class MutableFileHandle:
1204 I am a mutable uploadable built around a filehandle-like object,
1205 usually either a StringIO instance or a handle to an actual file.
1207 implements(IMutableUploadable)
1209 def __init__(self, filehandle):
1210 # The filehandle is defined as a generally file-like object that
1211 # has these two methods. We don't care beyond that.
1212 assert hasattr(filehandle, "read")
1213 assert hasattr(filehandle, "close")
1215 self._filehandle = filehandle
1216 # We must start reading at the beginning of the file, or we risk
1217 # encountering errors when the data read does not match the size
1218 # reported to the uploader.
1219 self._filehandle.seek(0)
1221 # We have not yet read anything, so our position is 0.
1227 I return the amount of data in my filehandle.
1229 if not hasattr(self, "_size"):
1230 old_position = self._filehandle.tell()
1231 # Seek to the end of the file by seeking 0 bytes from the
1233 self._filehandle.seek(0, os.SEEK_END)
1234 self._size = self._filehandle.tell()
1235 # Restore the previous position, in case this was called
1237 self._filehandle.seek(old_position)
1238 assert self._filehandle.tell() == old_position
1240 assert hasattr(self, "_size")
1246 I return the position of my read marker -- i.e., how much data I
1247 have already read and returned to callers.
1252 def read(self, length):
1254 I return some data (up to length bytes) from my filehandle.
1256 In most cases, I return length bytes, but sometimes I won't --
1257 for example, if I am asked to read beyond the end of a file, or
1260 results = self._filehandle.read(length)
1261 self._marker += len(results)
1267 I close the underlying filehandle. Any further operations on the
1268 filehandle fail at this point.
1270 self._filehandle.close()
1273 class MutableData(MutableFileHandle):
1275 I am a mutable uploadable built around a string, which I then cast
1276 into a StringIO and treat as a filehandle.
1279 def __init__(self, s):
1280 # Take a string and return a file-like uploadable.
1281 assert isinstance(s, str)
1283 MutableFileHandle.__init__(self, StringIO(s))
1286 class TransformingUploadable:
1288 I am an IMutableUploadable that wraps another IMutableUploadable,
1289 and some segments that are already on the grid. When I am called to
1290 read, I handle merging of boundary segments.
1292 implements(IMutableUploadable)
1295 def __init__(self, data, offset, segment_size, start, end):
1296 assert IMutableUploadable.providedBy(data)
1298 self._newdata = data
1299 self._offset = offset
1300 self._segment_size = segment_size
1304 self._read_marker = 0
1306 self._first_segment_offset = offset % segment_size
1308 num = self.log("TransformingUploadable: starting", parent=None)
1309 self._log_number = num
1310 self.log("got fso: %d" % self._first_segment_offset)
1311 self.log("got offset: %d" % self._offset)
1314 def log(self, *args, **kwargs):
1315 if 'parent' not in kwargs:
1316 kwargs['parent'] = self._log_number
1317 if "facility" not in kwargs:
1318 kwargs["facility"] = "tahoe.mutable.transforminguploadable"
1319 return log.msg(*args, **kwargs)
1323 return self._offset + self._newdata.get_size()
1326 def read(self, length):
1327 # We can get data from 3 sources here.
1328 # 1. The first of the segments provided to us.
1329 # 2. The data that we're replacing things with.
1330 # 3. The last of the segments provided to us.
1332 # are we in state 0?
1333 self.log("reading %d bytes" % length)
1336 old_data_length = self._first_segment_offset - self._read_marker
1337 if old_data_length > 0:
1338 if old_data_length > length:
1339 old_data_length = length
1340 self.log("returning %d bytes of old start data" % old_data_length)
1342 old_data_end = old_data_length + self._read_marker
1343 old_start_data = self._start[self._read_marker:old_data_end]
1344 length -= old_data_length
1346 # otherwise calculations later get screwed up.
1349 # Is there enough new data to satisfy this read? If not, we need
1350 # to pad the end of the data with data from our last segment.
1351 old_end_length = length - \
1352 (self._newdata.get_size() - self._newdata.pos())
1354 if old_end_length > 0:
1355 self.log("reading %d bytes of old end data" % old_end_length)
1357 # TODO: We're not explicitly checking for tail segment size
1358 # here. Is that a problem?
1359 old_data_offset = (length - old_end_length + \
1360 old_data_length) % self._segment_size
1361 self.log("reading at offset %d" % old_data_offset)
1362 old_end = old_data_offset + old_end_length
1363 old_end_data = self._end[old_data_offset:old_end]
1364 length -= old_end_length
1365 assert length == self._newdata.get_size() - self._newdata.pos()
1367 self.log("reading %d bytes of new data" % length)
1368 new_data = self._newdata.read(length)
1369 new_data = "".join(new_data)
1371 self._read_marker += len(old_start_data + new_data + old_end_data)
1373 return old_start_data + new_data + old_end_data