4 from StringIO import StringIO
5 from itertools import count
6 from zope.interface import implements
7 from twisted.internet import defer
8 from twisted.python import failure
9 from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
11 from allmydata.util import base32, hashutil, mathutil, log
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from foolscap.api import eventually, fireEventually
18 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, MODE_REPAIR, \
19 UncoordinatedWriteError, NotEnoughServersError
20 from allmydata.mutable.servermap import ServerMap
21 from allmydata.mutable.layout import get_version_from_checkstring,\
22 unpack_mdmf_checkstring, \
23 unpack_sdmf_checkstring, \
28 DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
29 PUSHING_BLOCKS_STATE = 0
30 PUSHING_EVERYTHING_ELSE_STATE = 1
34 implements(IPublishStatus)
35 statusid_counter = count(0)
38 self.timings["send_per_server"] = {}
39 self.timings["encrypt"] = 0.0
40 self.timings["encode"] = 0.0
44 self.storage_index = None
46 self.encoding = ("?", "?")
48 self.status = "Not started"
50 self.counter = self.statusid_counter.next()
51 self.started = time.time()
53 def add_per_server_time(self, server, elapsed):
54 if server not in self.timings["send_per_server"]:
55 self.timings["send_per_server"][server] = []
56 self.timings["send_per_server"][server].append(elapsed)
57 def accumulate_encode_time(self, elapsed):
58 self.timings["encode"] += elapsed
59 def accumulate_encrypt_time(self, elapsed):
60 self.timings["encrypt"] += elapsed
62 def get_started(self):
64 def get_storage_index(self):
65 return self.storage_index
66 def get_encoding(self):
68 def using_helper(self):
70 def get_servermap(self):
76 def get_progress(self):
80 def get_counter(self):
82 def get_problems(self):
85 def set_storage_index(self, si):
86 self.storage_index = si
87 def set_helper(self, helper):
89 def set_servermap(self, servermap):
90 self.servermap = servermap
91 def set_encoding(self, k, n):
92 self.encoding = (k, n)
93 def set_size(self, size):
95 def set_status(self, status):
97 def set_progress(self, value):
99 def set_active(self, value):
102 class LoopLimitExceededError(Exception):
106 """I represent a single act of publishing the mutable file to the grid. I
107 will only publish my data if the servermap I am using still represents
108 the current state of the world.
110 To make the initial publish, set servermap to None.
113 def __init__(self, filenode, storage_broker, servermap):
114 self._node = filenode
115 self._storage_broker = storage_broker
116 self._servermap = servermap
117 self._storage_index = self._node.get_storage_index()
118 self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
119 num = self.log("Publish(%s): starting" % prefix, parent=None)
120 self._log_number = num
122 self._first_write_error = None
123 self._last_failure = None
125 self._status = PublishStatus()
126 self._status.set_storage_index(self._storage_index)
127 self._status.set_helper(False)
128 self._status.set_progress(0.0)
129 self._status.set_active(True)
130 self._version = self._node.get_version()
131 assert self._version in (SDMF_VERSION, MDMF_VERSION)
134 def get_status(self):
137 def log(self, *args, **kwargs):
138 if 'parent' not in kwargs:
139 kwargs['parent'] = self._log_number
140 if "facility" not in kwargs:
141 kwargs["facility"] = "tahoe.mutable.publish"
142 return log.msg(*args, **kwargs)
145 def update(self, data, offset, blockhashes, version):
147 I replace the contents of this file with the contents of data,
148 starting at offset. I return a Deferred that fires with None
149 when the replacement has been completed, or with an error if
150 something went wrong during the process.
152 Note that this process will not upload new shares. If the file
153 being updated is in need of repair, callers will have to repair
157 # 1: Make server assignments. We'll assign each share that we know
158 # about on the grid to that server that currently holds that
159 # share, and will not place any new shares.
160 # 2: Setup encoding parameters. Most of these will stay the same
161 # -- datalength will change, as will some of the offsets.
162 # 3. Upload the new segments.
164 assert IMutableUploadable.providedBy(data)
168 # XXX: Use the MutableFileVersion instead.
169 self.datalength = self._node.get_size()
170 if data.get_size() > self.datalength:
171 self.datalength = data.get_size()
173 self.log("starting update")
174 self.log("adding new data of length %d at offset %d" % \
175 (data.get_size(), offset))
176 self.log("new data length is %d" % self.datalength)
177 self._status.set_size(self.datalength)
178 self._status.set_status("Started")
179 self._started = time.time()
181 self.done_deferred = defer.Deferred()
183 self._writekey = self._node.get_writekey()
184 assert self._writekey, "need write capability to publish"
186 # first, which servers will we publish to? We require that the
187 # servermap was updated in MODE_WRITE, so we can depend upon the
188 # serverlist computed by that process instead of computing our own.
189 assert self._servermap
190 assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
191 # we will push a version that is one larger than anything present
192 # in the grid, according to the servermap.
193 self._new_seqnum = self._servermap.highest_seqnum() + 1
194 self._status.set_servermap(self._servermap)
196 self.log(format="new seqnum will be %(seqnum)d",
197 seqnum=self._new_seqnum, level=log.NOISY)
199 # We're updating an existing file, so all of the following
200 # should be available.
201 self.readkey = self._node.get_readkey()
202 self.required_shares = self._node.get_required_shares()
203 assert self.required_shares is not None
204 self.total_shares = self._node.get_total_shares()
205 assert self.total_shares is not None
206 self._status.set_encoding(self.required_shares, self.total_shares)
208 self._pubkey = self._node.get_pubkey()
210 self._privkey = self._node.get_privkey()
212 self._encprivkey = self._node.get_encprivkey()
214 sb = self._storage_broker
215 full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
216 self.full_serverlist = full_serverlist # for use later, immutable
217 self.bad_servers = set() # servers who have errbacked/refused requests
219 # This will set self.segment_size, self.num_segments, and
220 # self.fec. TODO: Does it know how to do the offset? Probably
221 # not. So do that part next.
222 self.setup_encoding_parameters(offset=offset)
224 # if we experience any surprises (writes which were rejected because
225 # our test vector did not match, or shares which we didn't expect to
226 # see), we set this flag and report an UncoordinatedWriteError at the
227 # end of the publish process.
228 self.surprised = False
230 # we keep track of three tables. The first is our goal: which share
231 # we want to see on which servers. This is initially populated by the
232 # existing servermap.
233 self.goal = set() # pairs of (server, shnum) tuples
235 # the number of outstanding queries: those that are in flight and
236 # may or may not be delivered, accepted, or acknowledged. This is
237 # incremented when a query is sent, and decremented when the response
238 # returns or errbacks.
239 self.num_outstanding = 0
241 # the third is a table of successes: share which have actually been
242 # placed. These are populated when responses come back with success.
243 # When self.placed == self.goal, we're done.
244 self.placed = set() # (server, shnum) tuples
246 self.bad_share_checkstrings = {}
248 # This is set at the last step of the publishing process.
249 self.versioninfo = ""
251 # we use the servermap to populate the initial goal: this way we will
252 # try to update each existing share in place. Since we're
253 # updating, we ignore damaged and missing shares -- callers must
254 # do a repair to repair and recreate these.
255 self.goal = set(self._servermap.get_known_shares())
258 # SDMF files are updated differently.
259 self._version = MDMF_VERSION
260 writer_class = MDMFSlotWriteProxy
262 # For each (server, shnum) in self.goal, we make a
263 # write proxy for that server. We'll use this to write
264 # shares to the server.
265 for (server,shnum) in self.goal:
266 write_enabler = self._node.get_write_enabler(server)
267 renew_secret = self._node.get_renewal_secret(server)
268 cancel_secret = self._node.get_cancel_secret(server)
269 secrets = (write_enabler, renew_secret, cancel_secret)
271 writer = writer_class(shnum,
276 self.required_shares,
281 self.writers.setdefault(shnum, []).append(writer)
282 writer.server = server
283 known_shares = self._servermap.get_known_shares()
284 assert (server, shnum) in known_shares
285 old_versionid, old_timestamp = known_shares[(server,shnum)]
286 (old_seqnum, old_root_hash, old_salt, old_segsize,
287 old_datalength, old_k, old_N, old_prefix,
288 old_offsets_tuple) = old_versionid
289 writer.set_checkstring(old_seqnum,
293 # Our remote shares will not have a complete checkstring until
294 # after we are done writing share data and have started to write
295 # blocks. In the meantime, we need to know what to look for when
296 # writing, so that we can detect UncoordinatedWriteErrors.
297 self._checkstring = self.writers.values()[0][0].get_checkstring()
299 # Now, we start pushing shares.
300 self._status.timings["setup"] = time.time() - self._started
301 # First, we encrypt, encode, and publish the shares that we need
302 # to encrypt, encode, and publish.
304 # Our update process fetched these for us. We need to update
305 # them in place as publishing happens.
306 self.blockhashes = {} # (shnum, [blochashes])
307 for (i, bht) in blockhashes.iteritems():
308 # We need to extract the leaves from our old hash tree.
309 old_segcount = mathutil.div_ceil(version[4],
311 h = hashtree.IncompleteHashTree(old_segcount)
312 bht = dict(enumerate(bht))
314 leaves = h[h.get_leaf_index(0):]
315 for j in xrange(self.num_segments - len(leaves)):
318 assert len(leaves) >= self.num_segments
319 self.blockhashes[i] = leaves
320 # This list will now be the leaves that were set during the
321 # initial upload + enough empty hashes to make it a
322 # power-of-two. If we exceed a power of two boundary, we
323 # should be encoding the file over again, and should not be
325 #assert len(self.blockhashes[i]) == \
326 # hashtree.roundup_pow2(self.num_segments), \
327 # len(self.blockhashes[i])
328 # XXX: Except this doesn't work. Figure out why.
330 # These are filled in later, after we've modified the block hash
332 self.sharehash_leaves = None # eventually [sharehashes]
333 self.sharehashes = {} # shnum -> [sharehash leaves necessary to
334 # validate the share]
336 self.log("Starting push")
338 self._state = PUSHING_BLOCKS_STATE
341 return self.done_deferred
344 def publish(self, newdata):
345 """Publish the filenode's current contents. Returns a Deferred that
346 fires (with None) when the publish has done as much work as it's ever
347 going to do, or errbacks with ConsistencyError if it detects a
351 # 0. Setup encoding parameters, encoder, and other such things.
352 # 1. Encrypt, encode, and publish segments.
353 assert IMutableUploadable.providedBy(newdata)
356 self.datalength = newdata.get_size()
357 #if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
358 # self._version = MDMF_VERSION
360 # self._version = SDMF_VERSION
362 self.log("starting publish, datalen is %s" % self.datalength)
363 self._status.set_size(self.datalength)
364 self._status.set_status("Started")
365 self._started = time.time()
367 self.done_deferred = defer.Deferred()
369 self._writekey = self._node.get_writekey()
370 assert self._writekey, "need write capability to publish"
372 # first, which servers will we publish to? We require that the
373 # servermap was updated in MODE_WRITE, so we can depend upon the
374 # serverlist computed by that process instead of computing our own.
376 assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
377 # we will push a version that is one larger than anything present
378 # in the grid, according to the servermap.
379 self._new_seqnum = self._servermap.highest_seqnum() + 1
381 # If we don't have a servermap, that's because we're doing the
384 self._servermap = ServerMap()
385 self._status.set_servermap(self._servermap)
387 self.log(format="new seqnum will be %(seqnum)d",
388 seqnum=self._new_seqnum, level=log.NOISY)
390 # having an up-to-date servermap (or using a filenode that was just
391 # created for the first time) also guarantees that the following
392 # fields are available
393 self.readkey = self._node.get_readkey()
394 self.required_shares = self._node.get_required_shares()
395 assert self.required_shares is not None
396 self.total_shares = self._node.get_total_shares()
397 assert self.total_shares is not None
398 self._status.set_encoding(self.required_shares, self.total_shares)
400 self._pubkey = self._node.get_pubkey()
402 self._privkey = self._node.get_privkey()
404 self._encprivkey = self._node.get_encprivkey()
406 sb = self._storage_broker
407 full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
408 self.full_serverlist = full_serverlist # for use later, immutable
409 self.bad_servers = set() # servers who have errbacked/refused requests
411 # This will set self.segment_size, self.num_segments, and
413 self.setup_encoding_parameters()
415 # if we experience any surprises (writes which were rejected because
416 # our test vector did not match, or shares which we didn't expect to
417 # see), we set this flag and report an UncoordinatedWriteError at the
418 # end of the publish process.
419 self.surprised = False
421 # we keep track of three tables. The first is our goal: which share
422 # we want to see on which servers. This is initially populated by the
423 # existing servermap.
424 self.goal = set() # pairs of (server, shnum) tuples
426 # the number of outstanding queries: those that are in flight and
427 # may or may not be delivered, accepted, or acknowledged. This is
428 # incremented when a query is sent, and decremented when the response
429 # returns or errbacks.
430 self.num_outstanding = 0
432 # the third is a table of successes: share which have actually been
433 # placed. These are populated when responses come back with success.
434 # When self.placed == self.goal, we're done.
435 self.placed = set() # (server, shnum) tuples
437 self.bad_share_checkstrings = {}
439 # This is set at the last step of the publishing process.
440 self.versioninfo = ""
442 # we use the servermap to populate the initial goal: this way we will
443 # try to update each existing share in place.
444 self.goal = set(self._servermap.get_known_shares())
446 # then we add in all the shares that were bad (corrupted, bad
447 # signatures, etc). We want to replace these.
448 for key, old_checkstring in self._servermap.get_bad_shares().items():
449 (server, shnum) = key
450 self.goal.add( (server,shnum) )
451 self.bad_share_checkstrings[(server,shnum)] = old_checkstring
453 # TODO: Make this part do server selection.
456 if self._version == MDMF_VERSION:
457 writer_class = MDMFSlotWriteProxy
459 writer_class = SDMFSlotWriteProxy
461 # For each (server, shnum) in self.goal, we make a
462 # write proxy for that server. We'll use this to write
463 # shares to the server.
464 for (server,shnum) in self.goal:
465 write_enabler = self._node.get_write_enabler(server)
466 renew_secret = self._node.get_renewal_secret(server)
467 cancel_secret = self._node.get_cancel_secret(server)
468 secrets = (write_enabler, renew_secret, cancel_secret)
470 writer = writer_class(shnum,
475 self.required_shares,
479 self.writers.setdefault(shnum, []).append(writer)
480 writer.server = server
481 known_shares = self._servermap.get_known_shares()
482 if (server, shnum) in known_shares:
483 old_versionid, old_timestamp = known_shares[(server,shnum)]
484 (old_seqnum, old_root_hash, old_salt, old_segsize,
485 old_datalength, old_k, old_N, old_prefix,
486 old_offsets_tuple) = old_versionid
487 writer.set_checkstring(old_seqnum,
490 elif (server, shnum) in self.bad_share_checkstrings:
491 old_checkstring = self.bad_share_checkstrings[(server, shnum)]
492 writer.set_checkstring(old_checkstring)
494 # Our remote shares will not have a complete checkstring until
495 # after we are done writing share data and have started to write
496 # blocks. In the meantime, we need to know what to look for when
497 # writing, so that we can detect UncoordinatedWriteErrors.
498 self._checkstring = self.writers.values()[0][0].get_checkstring()
500 # Now, we start pushing shares.
501 self._status.timings["setup"] = time.time() - self._started
502 # First, we encrypt, encode, and publish the shares that we need
503 # to encrypt, encode, and publish.
505 # This will eventually hold the block hash chain for each share
506 # that we publish. We define it this way so that empty publishes
507 # will still have something to write to the remote slot.
508 self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
509 for i in xrange(self.total_shares):
510 blocks = self.blockhashes[i]
511 for j in xrange(self.num_segments):
513 self.sharehash_leaves = None # eventually [sharehashes]
514 self.sharehashes = {} # shnum -> [sharehash leaves necessary to
515 # validate the share]
517 self.log("Starting push")
519 self._state = PUSHING_BLOCKS_STATE
522 return self.done_deferred
525 def _update_status(self):
526 self._status.set_status("Sending Shares: %d placed out of %d, "
527 "%d messages outstanding" %
530 self.num_outstanding))
531 self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
534 def setup_encoding_parameters(self, offset=0):
535 if self._version == MDMF_VERSION:
536 segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
538 segment_size = self.datalength # SDMF is only one segment
539 # this must be a multiple of self.required_shares
540 segment_size = mathutil.next_multiple(segment_size,
541 self.required_shares)
542 self.segment_size = segment_size
544 # Calculate the starting segment for the upload.
546 # We use div_ceil instead of integer division here because
547 # it is semantically correct.
548 # If datalength isn't an even multiple of segment_size, but
549 # is larger than segment_size, datalength // segment_size
550 # will be the largest number such that num <= datalength and
551 # num % segment_size == 0. But that's not what we want,
552 # because it ignores the extra data. div_ceil will give us
553 # the right number of segments for the data that we're
555 self.num_segments = mathutil.div_ceil(self.datalength,
558 self.starting_segment = offset // segment_size
561 self.num_segments = 0
562 self.starting_segment = 0
565 self.log("building encoding parameters for file")
566 self.log("got segsize %d" % self.segment_size)
567 self.log("got %d segments" % self.num_segments)
569 if self._version == SDMF_VERSION:
570 assert self.num_segments in (0, 1) # SDMF
571 # calculate the tail segment size.
573 if segment_size and self.datalength:
574 self.tail_segment_size = self.datalength % segment_size
575 self.log("got tail segment size %d" % self.tail_segment_size)
577 self.tail_segment_size = 0
579 if self.tail_segment_size == 0 and segment_size:
580 # The tail segment is the same size as the other segments.
581 self.tail_segment_size = segment_size
584 fec = codec.CRSEncoder()
585 fec.set_params(self.segment_size,
586 self.required_shares, self.total_shares)
587 self.piece_size = fec.get_block_size()
590 if self.tail_segment_size == self.segment_size:
591 self.tail_fec = self.fec
593 tail_fec = codec.CRSEncoder()
594 tail_fec.set_params(self.tail_segment_size,
595 self.required_shares,
597 self.tail_fec = tail_fec
599 self._current_segment = self.starting_segment
600 self.end_segment = self.num_segments - 1
601 # Now figure out where the last segment should be.
602 if self.data.get_size() != self.datalength:
603 # We're updating a few segments in the middle of a mutable
604 # file, so we don't want to republish the whole thing.
605 # (we don't have enough data to do that even if we wanted
607 end = self.data.get_size()
608 self.end_segment = end // segment_size
609 if end % segment_size == 0:
610 self.end_segment -= 1
612 self.log("got start segment %d" % self.starting_segment)
613 self.log("got end segment %d" % self.end_segment)
616 def _push(self, ignored=None):
618 I manage state transitions. In particular, I see that we still
619 have a good enough number of writers to complete the upload
622 # Can we still successfully publish this file?
623 # TODO: Keep track of outstanding queries before aborting the
625 all_shnums = filter(lambda sh: len(self.writers[sh]) > 0,
626 self.writers.iterkeys())
627 if len(all_shnums) < self.required_shares or self.surprised:
628 return self._failure()
630 # Figure out what we need to do next. Each of these needs to
631 # return a deferred so that we don't block execution when this
632 # is first called in the upload method.
633 if self._state == PUSHING_BLOCKS_STATE:
634 return self.push_segment(self._current_segment)
636 elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
637 return self.push_everything_else()
639 # If we make it to this point, we were successful in placing the
644 def push_segment(self, segnum):
645 if self.num_segments == 0 and self._version == SDMF_VERSION:
646 self._add_dummy_salts()
648 if segnum > self.end_segment:
649 # We don't have any more segments to push.
650 self._state = PUSHING_EVERYTHING_ELSE_STATE
653 d = self._encode_segment(segnum)
654 d.addCallback(self._push_segment, segnum)
655 def _increment_segnum(ign):
656 self._current_segment += 1
657 # XXX: I don't think we need to do addBoth here -- any errBacks
658 # should be handled within push_segment.
659 d.addCallback(_increment_segnum)
660 d.addCallback(self._turn_barrier)
661 d.addCallback(self._push)
662 d.addErrback(self._failure)
665 def _turn_barrier(self, result):
667 I help the publish process avoid the recursion limit issues
670 return fireEventually(result)
673 def _add_dummy_salts(self):
675 SDMF files need a salt even if they're empty, or the signature
676 won't make sense. This method adds a dummy salt to each of our
677 SDMF writers so that they can write the signature later.
679 salt = os.urandom(16)
680 assert self._version == SDMF_VERSION
682 for shnum, writers in self.writers.iteritems():
683 for writer in writers:
684 writer.put_salt(salt)
687 def _encode_segment(self, segnum):
689 I encrypt and encode the segment segnum.
691 started = time.time()
693 if segnum + 1 == self.num_segments:
694 segsize = self.tail_segment_size
696 segsize = self.segment_size
699 self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
700 data = self.data.read(segsize)
701 # XXX: This is dumb. Why return a list?
704 assert len(data) == segsize, len(data)
706 salt = os.urandom(16)
708 key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
709 self._status.set_status("Encrypting")
711 crypttext = enc.process(data)
712 assert len(crypttext) == len(data)
715 self._status.accumulate_encrypt_time(now - started)
719 if segnum + 1 == self.num_segments:
724 self._status.set_status("Encoding")
725 crypttext_pieces = [None] * self.required_shares
726 piece_size = fec.get_block_size()
727 for i in range(len(crypttext_pieces)):
728 offset = i * piece_size
729 piece = crypttext[offset:offset+piece_size]
730 piece = piece + "\x00"*(piece_size - len(piece)) # padding
731 crypttext_pieces[i] = piece
732 assert len(piece) == piece_size
733 d = fec.encode(crypttext_pieces)
734 def _done_encoding(res):
735 elapsed = time.time() - started
736 self._status.accumulate_encode_time(elapsed)
738 d.addCallback(_done_encoding)
742 def _push_segment(self, encoded_and_salt, segnum):
744 I push (data, salt) as segment number segnum.
746 results, salt = encoded_and_salt
747 shares, shareids = results
748 self._status.set_status("Pushing segment")
749 for i in xrange(len(shares)):
750 sharedata = shares[i]
751 shareid = shareids[i]
752 if self._version == MDMF_VERSION:
753 hashed = salt + sharedata
756 block_hash = hashutil.block_hash(hashed)
757 self.blockhashes[shareid][segnum] = block_hash
758 # find the writer for this share
759 writers = self.writers[shareid]
760 for writer in writers:
761 writer.put_block(sharedata, segnum, salt)
764 def push_everything_else(self):
766 I put everything else associated with a share.
768 self._pack_started = time.time()
769 self.push_encprivkey()
770 self.push_blockhashes()
771 self.push_sharehashes()
772 self.push_toplevel_hashes_and_signature()
773 d = self.finish_publishing()
774 def _change_state(ignored):
775 self._state = DONE_STATE
776 d.addCallback(_change_state)
777 d.addCallback(self._push)
781 def push_encprivkey(self):
782 encprivkey = self._encprivkey
783 self._status.set_status("Pushing encrypted private key")
784 for shnum, writers in self.writers.iteritems():
785 for writer in writers:
786 writer.put_encprivkey(encprivkey)
789 def push_blockhashes(self):
790 self.sharehash_leaves = [None] * len(self.blockhashes)
791 self._status.set_status("Building and pushing block hash tree")
792 for shnum, blockhashes in self.blockhashes.iteritems():
793 t = hashtree.HashTree(blockhashes)
794 self.blockhashes[shnum] = list(t)
795 # set the leaf for future use.
796 self.sharehash_leaves[shnum] = t[0]
798 writers = self.writers[shnum]
799 for writer in writers:
800 writer.put_blockhashes(self.blockhashes[shnum])
803 def push_sharehashes(self):
804 self._status.set_status("Building and pushing share hash chain")
805 share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
806 for shnum in xrange(len(self.sharehash_leaves)):
807 needed_indices = share_hash_tree.needed_hashes(shnum)
808 self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
809 for i in needed_indices] )
810 writers = self.writers[shnum]
811 for writer in writers:
812 writer.put_sharehashes(self.sharehashes[shnum])
813 self.root_hash = share_hash_tree[0]
816 def push_toplevel_hashes_and_signature(self):
817 # We need to to three things here:
818 # - Push the root hash and salt hash
819 # - Get the checkstring of the resulting layout; sign that.
820 # - Push the signature
821 self._status.set_status("Pushing root hashes and signature")
822 for shnum in xrange(self.total_shares):
823 writers = self.writers[shnum]
824 for writer in writers:
825 writer.put_root_hash(self.root_hash)
826 self._update_checkstring()
827 self._make_and_place_signature()
830 def _update_checkstring(self):
832 After putting the root hash, MDMF files will have the
833 checkstring written to the storage server. This means that we
834 can update our copy of the checkstring so we can detect
835 uncoordinated writes. SDMF files will have the same checkstring,
836 so we need not do anything.
838 self._checkstring = self.writers.values()[0][0].get_checkstring()
841 def _make_and_place_signature(self):
843 I create and place the signature.
845 started = time.time()
846 self._status.set_status("Signing prefix")
847 signable = self.writers.values()[0][0].get_signable()
848 self.signature = self._privkey.sign(signable)
850 for (shnum, writers) in self.writers.iteritems():
851 for writer in writers:
852 writer.put_signature(self.signature)
853 self._status.timings['sign'] = time.time() - started
856 def finish_publishing(self):
857 # We're almost done -- we just need to put the verification key
859 started = time.time()
860 self._status.set_status("Pushing shares")
861 self._started_pushing = started
863 verification_key = self._pubkey.serialize()
865 for (shnum, writers) in self.writers.copy().iteritems():
866 for writer in writers:
867 writer.put_verification_key(verification_key)
868 self.num_outstanding += 1
869 def _no_longer_outstanding(res):
870 self.num_outstanding -= 1
873 d = writer.finish_publishing()
874 d.addBoth(_no_longer_outstanding)
875 d.addErrback(self._connection_problem, writer)
876 d.addCallback(self._got_write_answer, writer, started)
878 self._record_verinfo()
879 self._status.timings['pack'] = time.time() - started
880 return defer.DeferredList(ds)
883 def _record_verinfo(self):
884 self.versioninfo = self.writers.values()[0][0].get_verinfo()
887 def _connection_problem(self, f, writer):
889 We ran into a connection problem while working with writer, and
890 need to deal with that.
892 self.log("found problem: %s" % str(f))
893 self._last_failure = f
894 self.writers[writer.shnum].remove(writer)
897 def log_goal(self, goal, message=""):
899 for (shnum, server) in sorted([(s,p) for (p,s) in goal]):
900 logmsg.append("sh%d to [%s]" % (shnum, server.get_name()))
901 self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
902 self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
905 def update_goal(self):
906 # if log.recording_noisy
908 self.log_goal(self.goal, "before update: ")
910 # first, remove any bad servers from our goal
911 self.goal = set([ (server, shnum)
912 for (server, shnum) in self.goal
913 if server not in self.bad_servers ])
915 # find the homeless shares:
916 homefull_shares = set([shnum for (server, shnum) in self.goal])
917 homeless_shares = set(range(self.total_shares)) - homefull_shares
918 homeless_shares = sorted(list(homeless_shares))
919 # place them somewhere. We prefer unused servers at the beginning of
920 # the available server list.
922 if not homeless_shares:
925 # if an old share X is on a node, put the new share X there too.
926 # TODO: 1: redistribute shares to achieve one-per-server, by copying
927 # shares from existing servers to new (less-crowded) ones. The
928 # old shares must still be updated.
929 # TODO: 2: move those shares instead of copying them, to reduce future
932 # this is a bit CPU intensive but easy to analyze. We create a sort
933 # order for each server. If the server is marked as bad, we don't
934 # even put them in the list. Then we care about the number of shares
935 # which have already been assigned to them. After that we care about
936 # their permutation order.
937 old_assignments = DictOfSets()
938 for (server, shnum) in self.goal:
939 old_assignments.add(server, shnum)
942 for i, server in enumerate(self.full_serverlist):
943 serverid = server.get_serverid()
944 if server in self.bad_servers:
946 entry = (len(old_assignments.get(server, [])), i, serverid, server)
947 serverlist.append(entry)
951 raise NotEnoughServersError("Ran out of non-bad servers, "
953 str(self._first_write_error),
954 self._first_write_error)
956 # we then index this serverlist with an integer, because we may have
957 # to wrap. We update the goal as we go.
959 for shnum in homeless_shares:
960 (ignored1, ignored2, ignored3, server) = serverlist[i]
961 # if we are forced to send a share to a server that already has
962 # one, we may have two write requests in flight, and the
963 # servermap (which was computed before either request was sent)
964 # won't reflect the new shares, so the second response will be
965 # surprising. There is code in _got_write_answer() to tolerate
966 # this, otherwise it would cause the publish to fail with an
967 # UncoordinatedWriteError. See #546 for details of the trouble
968 # this used to cause.
969 self.goal.add( (server, shnum) )
971 if i >= len(serverlist):
974 self.log_goal(self.goal, "after update: ")
977 def _got_write_answer(self, answer, writer, started):
979 # SDMF writers only pretend to write when readers set their
980 # blocks, salts, and so on -- they actually just write once,
981 # at the end of the upload process. In fake writes, they
982 # return defer.succeed(None). If we see that, we shouldn't
983 # bother checking it.
986 server = writer.server
987 lp = self.log("_got_write_answer from %s, share %d" %
988 (server.get_name(), writer.shnum))
991 elapsed = now - started
993 self._status.add_per_server_time(server, elapsed)
995 wrote, read_data = answer
997 surprise_shares = set(read_data.keys()) - set([writer.shnum])
999 # We need to remove from surprise_shares any shares that we are
1000 # knowingly also writing to that server from other writers.
1002 # TODO: Precompute this.
1004 for shnum, writers in self.writers.iteritems():
1005 shares.extend([x.shnum for x in writers if x.server == server])
1006 known_shnums = set(shares)
1007 surprise_shares -= known_shnums
1008 self.log("found the following surprise shares: %s" %
1009 str(surprise_shares))
1011 # Now surprise shares contains all of the shares that we did not
1012 # expect to be there.
1015 for shnum in surprise_shares:
1016 # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
1017 checkstring = read_data[shnum][0]
1018 # What we want to do here is to see if their (seqnum,
1019 # roothash, salt) is the same as our (seqnum, roothash,
1020 # salt), or the equivalent for MDMF. The best way to do this
1021 # is to store a packed representation of our checkstring
1022 # somewhere, then not bother unpacking the other
1024 if checkstring == self._checkstring:
1025 # they have the right share, somehow
1027 if (server,shnum) in self.goal:
1028 # and we want them to have it, so we probably sent them a
1029 # copy in an earlier write. This is ok, and avoids the
1033 # They aren't in our goal, but they are still for the right
1034 # version. Somebody else wrote them, and it's a convergent
1035 # uncoordinated write. Pretend this is ok (don't be
1036 # surprised), since I suspect there's a decent chance that
1037 # we'll hit this in normal operation.
1041 # the new shares are of a different version
1042 if server in self._servermap.get_reachable_servers():
1043 # we asked them about their shares, so we had knowledge
1044 # of what they used to have. Any surprising shares must
1045 # have come from someone else, so UCW.
1048 # we didn't ask them, and now we've discovered that they
1049 # have a share we didn't know about. This indicates that
1050 # mapupdate should have wokred harder and asked more
1051 # servers before concluding that it knew about them all.
1053 # signal UCW, but make sure to ask this server next time,
1054 # so we'll remember to update it if/when we retry.
1056 # TODO: ask this server next time. I don't yet have a good
1057 # way to do this. Two insufficient possibilities are:
1059 # self._servermap.add_new_share(server, shnum, verinfo, now)
1060 # but that requires fetching/validating/parsing the whole
1061 # version string, and all we have is the checkstring
1062 # self._servermap.mark_bad_share(server, shnum, checkstring)
1063 # that will make publish overwrite the share next time,
1064 # but it won't re-query the server, and it won't make
1065 # mapupdate search further
1067 # TODO later: when publish starts, do
1068 # servermap.get_best_version(), extract the seqnum,
1069 # subtract one, and store as highest-replaceable-seqnum.
1070 # Then, if this surprise-because-we-didn't-ask share is
1071 # of highest-replaceable-seqnum or lower, we're allowed
1072 # to replace it: send out a new writev (or rather add it
1073 # to self.goal and loop).
1079 self.log("they had shares %s that we didn't know about" %
1080 (list(surprise_shares),),
1081 parent=lp, level=log.WEIRD, umid="un9CSQ")
1082 self.surprised = True
1085 # TODO: there are two possibilities. The first is that the server
1086 # is full (or just doesn't want to give us any room), which means
1087 # we shouldn't ask them again, but is *not* an indication of an
1088 # uncoordinated write. The second is that our testv failed, which
1089 # *does* indicate an uncoordinated write. We currently don't have
1090 # a way to tell these two apart (in fact, the storage server code
1091 # doesn't have the option of refusing our share).
1093 # If the server is full, mark the server as bad (so we don't ask
1094 # them again), but don't set self.surprised. The loop() will find
1097 # If the testv failed, log it, set self.surprised, but don't
1098 # bother adding to self.bad_servers .
1100 self.log("our testv failed, so the write did not happen",
1101 parent=lp, level=log.WEIRD, umid="8sc26g")
1102 self.surprised = True
1103 self.bad_servers.add(server) # don't ask them again
1104 # use the checkstring to add information to the log message
1105 unknown_format = False
1106 for (shnum,readv) in read_data.items():
1107 checkstring = readv[0]
1108 version = get_version_from_checkstring(checkstring)
1109 if version == MDMF_VERSION:
1111 other_roothash) = unpack_mdmf_checkstring(checkstring)
1112 elif version == SDMF_VERSION:
1115 other_IV) = unpack_sdmf_checkstring(checkstring)
1117 unknown_format = True
1118 expected_version = self._servermap.version_on_server(server,
1120 if expected_version:
1121 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1122 offsets_tuple) = expected_version
1123 msg = ("somebody modified the share on us:"
1124 " shnum=%d: I thought they had #%d:R=%s," %
1126 seqnum, base32.b2a(root_hash)[:4]))
1128 msg += (" but I don't know how to read share"
1129 " format %d" % version)
1131 msg += " but testv reported #%d:R=%s" % \
1132 (other_seqnum, other_roothash)
1133 self.log(msg, parent=lp, level=log.NOISY)
1134 # if expected_version==None, then we didn't expect to see a
1135 # share on that server, and the 'surprise_shares' clause
1136 # above will have logged it.
1139 # and update the servermap
1140 # self.versioninfo is set during the last phase of publishing.
1141 # If we get there, we know that responses correspond to placed
1142 # shares, and can safely execute these statements.
1143 if self.versioninfo:
1144 self.log("wrote successfully: adding new share to servermap")
1145 self._servermap.add_new_share(server, writer.shnum,
1146 self.versioninfo, started)
1147 self.placed.add( (server, writer.shnum) )
1148 self._update_status()
1149 # the next method in the deferred chain will check to see if
1150 # we're done and successful.
1155 if not self._running:
1157 self._running = False
1159 self._status.timings["total"] = now - self._started
1161 elapsed = now - self._started_pushing
1162 self._status.timings['push'] = elapsed
1164 self._status.set_active(False)
1165 self.log("Publish done, success")
1166 self._status.set_status("Finished")
1167 self._status.set_progress(1.0)
1168 # Get k and segsize, then give them to the caller.
1170 hints['segsize'] = self.segment_size
1171 hints['k'] = self.required_shares
1172 self._node.set_downloader_hints(hints)
1173 eventually(self.done_deferred.callback, None)
1175 def _failure(self, f=None):
1177 self._last_failure = f
1179 if not self.surprised:
1180 # We ran out of servers
1181 msg = "Publish ran out of good servers"
1182 if self._last_failure:
1183 msg += ", last failure was: %s" % str(self._last_failure)
1185 e = NotEnoughServersError(msg)
1188 # We ran into shares that we didn't recognize, which means
1189 # that we need to return an UncoordinatedWriteError.
1190 self.log("Publish failed with UncoordinatedWriteError")
1191 e = UncoordinatedWriteError()
1192 f = failure.Failure(e)
1193 eventually(self.done_deferred.callback, f)
1196 class MutableFileHandle:
1198 I am a mutable uploadable built around a filehandle-like object,
1199 usually either a StringIO instance or a handle to an actual file.
1201 implements(IMutableUploadable)
1203 def __init__(self, filehandle):
1204 # The filehandle is defined as a generally file-like object that
1205 # has these two methods. We don't care beyond that.
1206 assert hasattr(filehandle, "read")
1207 assert hasattr(filehandle, "close")
1209 self._filehandle = filehandle
1210 # We must start reading at the beginning of the file, or we risk
1211 # encountering errors when the data read does not match the size
1212 # reported to the uploader.
1213 self._filehandle.seek(0)
1215 # We have not yet read anything, so our position is 0.
1221 I return the amount of data in my filehandle.
1223 if not hasattr(self, "_size"):
1224 old_position = self._filehandle.tell()
1225 # Seek to the end of the file by seeking 0 bytes from the
1227 self._filehandle.seek(0, 2) # 2 == os.SEEK_END in 2.5+
1228 self._size = self._filehandle.tell()
1229 # Restore the previous position, in case this was called
1231 self._filehandle.seek(old_position)
1232 assert self._filehandle.tell() == old_position
1234 assert hasattr(self, "_size")
1240 I return the position of my read marker -- i.e., how much data I
1241 have already read and returned to callers.
1246 def read(self, length):
1248 I return some data (up to length bytes) from my filehandle.
1250 In most cases, I return length bytes, but sometimes I won't --
1251 for example, if I am asked to read beyond the end of a file, or
1254 results = self._filehandle.read(length)
1255 self._marker += len(results)
1261 I close the underlying filehandle. Any further operations on the
1262 filehandle fail at this point.
1264 self._filehandle.close()
1267 class MutableData(MutableFileHandle):
1269 I am a mutable uploadable built around a string, which I then cast
1270 into a StringIO and treat as a filehandle.
1273 def __init__(self, s):
1274 # Take a string and return a file-like uploadable.
1275 assert isinstance(s, str)
1277 MutableFileHandle.__init__(self, StringIO(s))
1280 class TransformingUploadable:
1282 I am an IMutableUploadable that wraps another IMutableUploadable,
1283 and some segments that are already on the grid. When I am called to
1284 read, I handle merging of boundary segments.
1286 implements(IMutableUploadable)
1289 def __init__(self, data, offset, segment_size, start, end):
1290 assert IMutableUploadable.providedBy(data)
1292 self._newdata = data
1293 self._offset = offset
1294 self._segment_size = segment_size
1298 self._read_marker = 0
1300 self._first_segment_offset = offset % segment_size
1302 num = self.log("TransformingUploadable: starting", parent=None)
1303 self._log_number = num
1304 self.log("got fso: %d" % self._first_segment_offset)
1305 self.log("got offset: %d" % self._offset)
1308 def log(self, *args, **kwargs):
1309 if 'parent' not in kwargs:
1310 kwargs['parent'] = self._log_number
1311 if "facility" not in kwargs:
1312 kwargs["facility"] = "tahoe.mutable.transforminguploadable"
1313 return log.msg(*args, **kwargs)
1317 return self._offset + self._newdata.get_size()
1320 def read(self, length):
1321 # We can get data from 3 sources here.
1322 # 1. The first of the segments provided to us.
1323 # 2. The data that we're replacing things with.
1324 # 3. The last of the segments provided to us.
1326 # are we in state 0?
1327 self.log("reading %d bytes" % length)
1330 old_data_length = self._first_segment_offset - self._read_marker
1331 if old_data_length > 0:
1332 if old_data_length > length:
1333 old_data_length = length
1334 self.log("returning %d bytes of old start data" % old_data_length)
1336 old_data_end = old_data_length + self._read_marker
1337 old_start_data = self._start[self._read_marker:old_data_end]
1338 length -= old_data_length
1340 # otherwise calculations later get screwed up.
1343 # Is there enough new data to satisfy this read? If not, we need
1344 # to pad the end of the data with data from our last segment.
1345 old_end_length = length - \
1346 (self._newdata.get_size() - self._newdata.pos())
1348 if old_end_length > 0:
1349 self.log("reading %d bytes of old end data" % old_end_length)
1351 # TODO: We're not explicitly checking for tail segment size
1352 # here. Is that a problem?
1353 old_data_offset = (length - old_end_length + \
1354 old_data_length) % self._segment_size
1355 self.log("reading at offset %d" % old_data_offset)
1356 old_end = old_data_offset + old_end_length
1357 old_end_data = self._end[old_data_offset:old_end]
1358 length -= old_end_length
1359 assert length == self._newdata.get_size() - self._newdata.pos()
1361 self.log("reading %d bytes of new data" % length)
1362 new_data = self._newdata.read(length)
1363 new_data = "".join(new_data)
1365 self._read_marker += len(old_start_data + new_data + old_end_data)
1367 return old_start_data + new_data + old_end_data