4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, ICheckable, ICheckResults, \
8 NotEnoughSharesError, MDMF_VERSION, SDMF_VERSION, IMutableUploadable, \
9 IMutableFileVersion, IWriteable
10 from allmydata.util import hashutil, log, consumer, deferredutil, mathutil
11 from allmydata.util.assertutil import precondition
12 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI, \
13 WriteableMDMFFileURI, ReadonlyMDMFFileURI
14 from allmydata.monitor import Monitor
15 from pycryptopp.cipher.aes import AES
17 from allmydata.mutable.publish import Publish, MutableData,\
18 TransformingUploadable
19 from allmydata.mutable.common import MODE_READ, MODE_WRITE, MODE_CHECK, UnrecoverableFileError, \
20 UncoordinatedWriteError
21 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
22 from allmydata.mutable.retrieve import Retrieve
23 from allmydata.mutable.checker import MutableChecker, MutableCheckAndRepairer
24 from allmydata.mutable.repairer import Repairer
28 # these parameters are copied from foolscap.reconnector, which gets them
29 # from twisted.internet.protocol.ReconnectingClientFactory
31 factor = 2.7182818284590451 # (math.e)
32 jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
36 self._delay = self.initialDelay
38 def delay(self, node, f):
42 self._delay = self._delay * self.factor
43 self._delay = random.normalvariate(self._delay,
44 self._delay * self.jitter)
46 reactor.callLater(self._delay, d.callback, None)
49 # use nodemaker.create_mutable_file() to make one of these
51 class MutableFileNode:
52 implements(IMutableFileNode, ICheckable)
54 def __init__(self, storage_broker, secret_holder,
55 default_encoding_parameters, history):
56 self._storage_broker = storage_broker
57 self._secret_holder = secret_holder
58 self._default_encoding_parameters = default_encoding_parameters
59 self._history = history
60 self._pubkey = None # filled in upon first read
61 self._privkey = None # filled in if we're mutable
62 # we keep track of the last encoding parameters that we use. These
63 # are updated upon retrieve, and used by publish. If we publish
64 # without ever reading (i.e. overwrite()), then we use these values.
65 self._required_shares = default_encoding_parameters["k"]
66 self._total_shares = default_encoding_parameters["n"]
67 self._sharemap = {} # known shares, shnum-to-[nodeids]
68 self._most_recent_size = None
69 # filled in after __init__ if we're being created for the first time;
70 # filled in by the servermap updater before publishing, otherwise.
71 # set to this default value in case neither of those things happen,
72 # or in case the servermap can't find any shares to tell us what
74 self._protocol_version = None
76 # all users of this MutableFileNode go through the serializer. This
77 # takes advantage of the fact that Deferreds discard the callbacks
78 # that they're done with, so we can keep using the same Deferred
79 # forever without consuming more and more memory.
80 self._serializer = defer.succeed(None)
82 # Starting with MDMF, we can get these from caps if they're
83 # there. Leave them alone for now; they'll be filled in by my
84 # init_from_cap method if necessary.
85 self._downloader_hints = {}
88 if hasattr(self, '_uri'):
89 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
91 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
93 def init_from_cap(self, filecap):
94 # we have the URI, but we have not yet retrieved the public
95 # verification key, nor things like 'k' or 'N'. If and when someone
96 # wants to get our contents, we'll pull from shares and fill those
98 if isinstance(filecap, (WriteableMDMFFileURI, ReadonlyMDMFFileURI)):
99 self._protocol_version = MDMF_VERSION
100 elif isinstance(filecap, (ReadonlySSKFileURI, WriteableSSKFileURI)):
101 self._protocol_version = SDMF_VERSION
104 self._writekey = None
106 if not filecap.is_readonly() and filecap.is_mutable():
107 self._writekey = self._uri.writekey
108 self._readkey = self._uri.readkey
109 self._storage_index = self._uri.storage_index
110 self._fingerprint = self._uri.fingerprint
111 # the following values are learned during Retrieval
113 # self._required_shares
115 # and these are needed for Publish. They are filled in by Retrieval
116 # if possible, otherwise by the first peer that Publish talks to.
118 self._encprivkey = None
122 def create_with_keys(self, (pubkey, privkey), contents,
123 version=SDMF_VERSION):
124 """Call this to create a brand-new mutable file. It will create the
125 shares, find homes for them, and upload the initial contents (created
126 with the same rules as IClient.create_mutable_file() ). Returns a
127 Deferred that fires (with the MutableFileNode instance you should
128 use) when it completes.
130 self._pubkey, self._privkey = pubkey, privkey
131 pubkey_s = self._pubkey.serialize()
132 privkey_s = self._privkey.serialize()
133 self._writekey = hashutil.ssk_writekey_hash(privkey_s)
134 self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
135 self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
136 if version == MDMF_VERSION:
137 self._uri = WriteableMDMFFileURI(self._writekey, self._fingerprint)
138 self._protocol_version = version
139 elif version == SDMF_VERSION:
140 self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
141 self._protocol_version = version
142 self._readkey = self._uri.readkey
143 self._storage_index = self._uri.storage_index
144 initial_contents = self._get_initial_contents(contents)
145 return self._upload(initial_contents, None)
147 def _get_initial_contents(self, contents):
149 return MutableData("")
151 if isinstance(contents, str):
152 return MutableData(contents)
154 if IMutableUploadable.providedBy(contents):
157 assert callable(contents), "%s should be callable, not %s" % \
158 (contents, type(contents))
159 return contents(self)
161 def _encrypt_privkey(self, writekey, privkey):
163 crypttext = enc.process(privkey)
166 def _decrypt_privkey(self, enc_privkey):
167 enc = AES(self._writekey)
168 privkey = enc.process(enc_privkey)
171 def _populate_pubkey(self, pubkey):
172 self._pubkey = pubkey
173 def _populate_required_shares(self, required_shares):
174 self._required_shares = required_shares
175 def _populate_total_shares(self, total_shares):
176 self._total_shares = total_shares
178 def _populate_privkey(self, privkey):
179 self._privkey = privkey
180 def _populate_encprivkey(self, encprivkey):
181 self._encprivkey = encprivkey
183 def get_write_enabler(self, server):
184 seed = server.get_foolscap_write_enabler_seed()
185 assert len(seed) == 20
186 return hashutil.ssk_write_enabler_hash(self._writekey, seed)
187 def get_renewal_secret(self, server):
188 crs = self._secret_holder.get_renewal_secret()
189 frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
190 lease_seed = server.get_lease_seed()
191 assert len(lease_seed) == 20
192 return hashutil.bucket_renewal_secret_hash(frs, lease_seed)
193 def get_cancel_secret(self, server):
194 ccs = self._secret_holder.get_cancel_secret()
195 fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
196 lease_seed = server.get_lease_seed()
197 assert len(lease_seed) == 20
198 return hashutil.bucket_cancel_secret_hash(fcs, lease_seed)
200 def get_writekey(self):
201 return self._writekey
202 def get_readkey(self):
204 def get_storage_index(self):
205 return self._storage_index
206 def get_fingerprint(self):
207 return self._fingerprint
208 def get_privkey(self):
210 def get_encprivkey(self):
211 return self._encprivkey
212 def get_pubkey(self):
215 def get_required_shares(self):
216 return self._required_shares
217 def get_total_shares(self):
218 return self._total_shares
220 ####################################
224 return self._most_recent_size
226 def get_current_size(self):
227 d = self.get_size_of_best_version()
228 d.addCallback(self._stash_size)
231 def _stash_size(self, size):
232 self._most_recent_size = size
237 def get_readcap(self):
238 return self._uri.get_readonly()
239 def get_verify_cap(self):
240 return self._uri.get_verify_cap()
241 def get_repair_cap(self):
242 if self._uri.is_readonly():
247 return self._uri.to_string()
249 def get_write_uri(self):
250 if self.is_readonly():
252 return self._uri.to_string()
254 def get_readonly_uri(self):
255 return self._uri.get_readonly().to_string()
257 def get_readonly(self):
258 if self.is_readonly():
260 ro = MutableFileNode(self._storage_broker, self._secret_holder,
261 self._default_encoding_parameters, self._history)
262 ro.init_from_cap(self._uri.get_readonly())
265 def is_mutable(self):
266 return self._uri.is_mutable()
268 def is_readonly(self):
269 return self._uri.is_readonly()
271 def is_unknown(self):
274 def is_allowed_in_immutable_directory(self):
275 return not self._uri.is_mutable()
277 def raise_error(self):
281 return hash((self.__class__, self._uri))
282 def __cmp__(self, them):
283 if cmp(type(self), type(them)):
284 return cmp(type(self), type(them))
285 if cmp(self.__class__, them.__class__):
286 return cmp(self.__class__, them.__class__)
287 return cmp(self._uri, them._uri)
290 #################################
293 def check(self, monitor, verify=False, add_lease=False):
294 checker = MutableChecker(self, self._storage_broker,
295 self._history, monitor)
296 return checker.check(verify, add_lease)
298 def check_and_repair(self, monitor, verify=False, add_lease=False):
299 checker = MutableCheckAndRepairer(self, self._storage_broker,
300 self._history, monitor)
301 return checker.check(verify, add_lease)
303 #################################
306 def repair(self, check_results, force=False, monitor=None):
307 assert ICheckResults(check_results)
308 r = Repairer(self, check_results, self._storage_broker,
309 self._history, monitor)
314 #################################
317 def get_best_readable_version(self):
319 I return a Deferred that fires with a MutableFileVersion
320 representing the best readable version of the file that I
323 return self.get_readable_version()
326 def get_readable_version(self, servermap=None, version=None):
328 I return a Deferred that fires with an MutableFileVersion for my
329 version argument, if there is a recoverable file of that version
330 on the grid. If there is no recoverable version, I fire with an
331 UnrecoverableFileError.
333 If a servermap is provided, I look in there for the requested
334 version. If no servermap is provided, I create and update a new
337 If no version is provided, then I return a MutableFileVersion
338 representing the best recoverable version of the file.
340 d = self._get_version_from_servermap(MODE_READ, servermap, version)
341 def _build_version((servermap, their_version)):
342 assert their_version in servermap.recoverable_versions()
343 assert their_version in servermap.make_versionmap()
345 mfv = MutableFileVersion(self,
349 self._storage_broker,
351 history=self._history)
352 assert mfv.is_readonly()
353 mfv.set_downloader_hints(self._downloader_hints)
354 # our caller can use this to download the contents of the
357 return d.addCallback(_build_version)
360 def _get_version_from_servermap(self,
365 I return a Deferred that fires with (servermap, version).
367 This function performs validation and a servermap update. If it
368 returns (servermap, version), the caller can assume that:
369 - servermap was last updated in mode.
370 - version is recoverable, and corresponds to the servermap.
372 If version and servermap are provided to me, I will validate
373 that version exists in the servermap, and that the servermap was
376 If version is not provided, but servermap is, I will validate
377 the servermap and return the best recoverable version that I can
378 find in the servermap.
380 If the version is provided but the servermap isn't, I will
381 obtain a servermap that has been updated in the correct mode and
382 validate that version is found and recoverable.
384 If neither servermap nor version are provided, I will obtain a
385 servermap updated in the correct mode, and return the best
386 recoverable version that I can find in there.
389 if servermap and servermap.get_last_update()[0] == mode:
390 d = defer.succeed(servermap)
392 d = self._get_servermap(mode)
394 def _get_version(servermap, v):
395 if v and v not in servermap.recoverable_versions():
398 v = servermap.best_recoverable_version()
400 raise UnrecoverableFileError("no recoverable versions")
402 return (servermap, v)
403 return d.addCallback(_get_version, version)
406 def download_best_version(self, progress=None):
408 I return a Deferred that fires with the contents of the best
409 version of this mutable file.
411 return self._do_serialized(self._download_best_version, progress=progress)
414 def _download_best_version(self, progress=None):
416 I am the serialized sibling of download_best_version.
418 d = self.get_best_readable_version()
419 d.addCallback(self._record_size)
420 d.addCallback(lambda version: version.download_to_data(progress=progress))
422 # It is possible that the download will fail because there
423 # aren't enough shares to be had. If so, we will try again after
424 # updating the servermap in MODE_WRITE, which may find more
425 # shares than updating in MODE_READ, as we just did. We can do
426 # this by getting the best mutable version and downloading from
427 # that -- the best mutable version will be a MutableFileVersion
428 # with a servermap that was last updated in MODE_WRITE, as we
429 # want. If this fails, then we give up.
430 def _maybe_retry(failure):
431 failure.trap(NotEnoughSharesError)
433 d = self.get_best_mutable_version()
434 d.addCallback(self._record_size)
435 d.addCallback(lambda version: version.download_to_data(progress=progress))
438 d.addErrback(_maybe_retry)
442 def _record_size(self, mfv):
444 I record the size of a mutable file version.
446 self._most_recent_size = mfv.get_size()
450 def get_size_of_best_version(self):
452 I return the size of the best version of this mutable file.
454 This is equivalent to calling get_size() on the result of
455 get_best_readable_version().
457 d = self.get_best_readable_version()
458 return d.addCallback(lambda mfv: mfv.get_size())
461 #################################
464 def get_best_mutable_version(self, servermap=None):
466 I return a Deferred that fires with a MutableFileVersion
467 representing the best readable version of the file that I
468 represent. I am like get_best_readable_version, except that I
469 will try to make a writeable version if I can.
471 return self.get_mutable_version(servermap=servermap)
474 def get_mutable_version(self, servermap=None, version=None):
476 I return a version of this mutable file. I return a Deferred
477 that fires with a MutableFileVersion
479 If version is provided, the Deferred will fire with a
480 MutableFileVersion initailized with that version. Otherwise, it
481 will fire with the best version that I can recover.
483 If servermap is provided, I will use that to find versions
484 instead of performing my own servermap update.
486 if self.is_readonly():
487 return self.get_readable_version(servermap=servermap,
490 # get_mutable_version => write intent, so we require that the
491 # servermap is updated in MODE_WRITE
492 d = self._get_version_from_servermap(MODE_WRITE, servermap, version)
493 def _build_version((servermap, smap_version)):
494 # these should have been set by the servermap update.
495 assert self._secret_holder
496 assert self._writekey
498 mfv = MutableFileVersion(self,
502 self._storage_broker,
506 history=self._history)
507 assert not mfv.is_readonly()
508 mfv.set_downloader_hints(self._downloader_hints)
511 return d.addCallback(_build_version)
514 # XXX: I'm uncomfortable with the difference between upload and
515 # overwrite, which, FWICT, is basically that you don't have to
516 # do a servermap update before you overwrite. We split them up
517 # that way anyway, so I guess there's no real difficulty in
518 # offering both ways to callers, but it also makes the
519 # public-facing API cluttery, and makes it hard to discern the
520 # right way of doing things.
522 # In general, we leave it to callers to ensure that they aren't
523 # going to cause UncoordinatedWriteErrors when working with
524 # MutableFileVersions. We know that the next three operations
525 # (upload, overwrite, and modify) will all operate on the same
526 # version, so we say that only one of them can be going on at once,
527 # and serialize them to ensure that that actually happens, since as
528 # the caller in this situation it is our job to do that.
529 def overwrite(self, new_contents):
531 I overwrite the contents of the best recoverable version of this
532 mutable file with new_contents. This is equivalent to calling
533 overwrite on the result of get_best_mutable_version with
534 new_contents as an argument. I return a Deferred that eventually
535 fires with the results of my replacement process.
537 # TODO: Update downloader hints.
538 return self._do_serialized(self._overwrite, new_contents)
541 def _overwrite(self, new_contents):
543 I am the serialized sibling of overwrite.
545 d = self.get_best_mutable_version()
546 d.addCallback(lambda mfv: mfv.overwrite(new_contents))
547 d.addCallback(self._did_upload, new_contents.get_size())
551 def upload(self, new_contents, servermap):
553 I overwrite the contents of the best recoverable version of this
554 mutable file with new_contents, using servermap instead of
555 creating/updating our own servermap. I return a Deferred that
556 fires with the results of my upload.
558 # TODO: Update downloader hints
559 return self._do_serialized(self._upload, new_contents, servermap)
562 def modify(self, modifier, backoffer=None):
564 I modify the contents of the best recoverable version of this
565 mutable file with the modifier. This is equivalent to calling
566 modify on the result of get_best_mutable_version. I return a
567 Deferred that eventually fires with an UploadResults instance
568 describing this process.
570 # TODO: Update downloader hints.
571 return self._do_serialized(self._modify, modifier, backoffer)
574 def _modify(self, modifier, backoffer):
576 I am the serialized sibling of modify.
578 d = self.get_best_mutable_version()
579 d.addCallback(lambda mfv: mfv.modify(modifier, backoffer))
583 def download_version(self, servermap, version, fetch_privkey=False):
585 Download the specified version of this mutable file. I return a
586 Deferred that fires with the contents of the specified version
587 as a bytestring, or errbacks if the file is not recoverable.
589 d = self.get_readable_version(servermap, version)
590 return d.addCallback(lambda mfv: mfv.download_to_data(fetch_privkey))
593 def get_servermap(self, mode):
595 I return a servermap that has been updated in mode.
597 mode should be one of MODE_READ, MODE_WRITE, MODE_CHECK or
598 MODE_ANYTHING. See servermap.py for more on what these mean.
600 return self._do_serialized(self._get_servermap, mode)
603 def _get_servermap(self, mode):
605 I am a serialized twin to get_servermap.
607 servermap = ServerMap()
608 d = self._update_servermap(servermap, mode)
609 # The servermap will tell us about the most recent size of the
610 # file, so we may as well set that so that callers might get
611 # more data about us.
612 if not self._most_recent_size:
613 d.addCallback(self._get_size_from_servermap)
617 def _get_size_from_servermap(self, servermap):
619 I extract the size of the best version of this file and record
620 it in self._most_recent_size. I return the servermap that I was
623 if servermap.recoverable_versions():
624 v = servermap.best_recoverable_version()
625 size = v[4] # verinfo[4] == size
626 self._most_recent_size = size
630 def _update_servermap(self, servermap, mode):
631 u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
634 self._history.notify_mapupdate(u.get_status())
638 #def set_version(self, version):
639 # I can be set in two ways:
640 # 1. When the node is created.
641 # 2. (for an existing share) when the Servermap is updated
643 # assert version in (MDMF_VERSION, SDMF_VERSION)
644 # self._protocol_version = version
647 def get_version(self):
648 return self._protocol_version
651 def _do_serialized(self, cb, *args, **kwargs):
652 # note: to avoid deadlock, this callable is *not* allowed to invoke
653 # other serialized methods within this (or any other)
654 # MutableFileNode. The callable should be a bound method of this same
657 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
658 # we need to put off d.callback until this Deferred is finished being
659 # processed. Otherwise the caller's subsequent activities (like,
660 # doing other things with this node) can cause reentrancy problems in
661 # the Deferred code itself
662 self._serializer.addBoth(lambda res: eventually(d.callback, res))
663 # add a log.err just in case something really weird happens, because
664 # self._serializer stays around forever, therefore we won't see the
665 # usual Unhandled Error in Deferred that would give us a hint.
666 self._serializer.addErrback(log.err)
670 def _upload(self, new_contents, servermap):
672 A MutableFileNode still has to have some way of getting
673 published initially, which is what I am here for. After that,
674 all publishing, updating, modifying and so on happens through
677 assert self._pubkey, "update_servermap must be called before publish"
679 # Define IPublishInvoker with a set_downloader_hints method?
680 # Then have the publisher call that method when it's done publishing?
681 p = Publish(self, self._storage_broker, servermap)
683 self._history.notify_publish(p.get_status(),
684 new_contents.get_size())
685 d = p.publish(new_contents)
686 d.addCallback(self._did_upload, new_contents.get_size())
690 def set_downloader_hints(self, hints):
691 self._downloader_hints = hints
693 def _did_upload(self, res, size):
694 self._most_recent_size = size
698 class MutableFileVersion:
700 I represent a specific version (most likely the best version) of a
703 Since I implement IReadable, instances which hold a
704 reference to an instance of me are guaranteed the ability (absent
705 connection difficulties or unrecoverable versions) to read the file
706 that I represent. Depending on whether I was initialized with a
707 write capability or not, I may also provide callers the ability to
708 overwrite or modify the contents of the mutable file that I
711 implements(IMutableFileVersion, IWriteable)
725 self._servermap = servermap
726 self._version = version
727 self._storage_index = storage_index
728 self._write_secrets = write_secrets
729 self._history = history
730 self._storage_broker = storage_broker
732 #assert isinstance(readcap, IURI)
733 self._readcap = readcap
735 self._writekey = writekey
736 self._serializer = defer.succeed(None)
739 def get_sequence_number(self):
741 Get the sequence number of the mutable version that I represent.
743 return self._version[0] # verinfo[0] == the sequence number
747 def get_writekey(self):
749 I return a writekey or None if I don't have a writekey.
751 return self._writekey
754 def set_downloader_hints(self, hints):
756 I set the downloader hints.
758 assert isinstance(hints, dict)
760 self._downloader_hints = hints
763 def get_downloader_hints(self):
765 I return the downloader hints.
767 return self._downloader_hints
770 def overwrite(self, new_contents):
772 I overwrite the contents of this mutable file version with the
773 data in new_contents.
775 assert not self.is_readonly()
777 return self._do_serialized(self._overwrite, new_contents)
780 def _overwrite(self, new_contents):
781 assert IMutableUploadable.providedBy(new_contents)
782 assert self._servermap.get_last_update()[0] == MODE_WRITE
784 return self._upload(new_contents)
787 def modify(self, modifier, backoffer=None):
788 """I use a modifier callback to apply a change to the mutable file.
789 I implement the following pseudocode::
791 obtain_mutable_filenode_lock()
794 update_servermap(MODE_WRITE)
795 old = retrieve_best_version()
796 new = modifier(old, servermap, first_time)
801 except UncoordinatedWriteError, e:
805 release_mutable_filenode_lock()
807 The idea is that your modifier function can apply a delta of some
808 sort, and it will be re-run as necessary until it succeeds. The
809 modifier must inspect the old version to see whether its delta has
810 already been applied: if so it should return the contents unmodified.
812 Note that the modifier is required to run synchronously, and must not
813 invoke any methods on this MutableFileNode instance.
815 The backoff-er is a callable that is responsible for inserting a
816 random delay between subsequent attempts, to help competing updates
817 from colliding forever. It is also allowed to give up after a while.
818 The backoffer is given two arguments: this MutableFileNode, and the
819 Failure object that contains the UncoordinatedWriteError. It should
820 return a Deferred that will fire when the next attempt should be
821 made, or return the Failure if the loop should give up. If
822 backoffer=None, a default one is provided which will perform
823 exponential backoff, and give up after 4 tries. Note that the
824 backoffer should not invoke any methods on this MutableFileNode
825 instance, and it needs to be highly conscious of deadlock issues.
827 assert not self.is_readonly()
829 return self._do_serialized(self._modify, modifier, backoffer)
832 def _modify(self, modifier, backoffer):
833 if backoffer is None:
834 backoffer = BackoffAgent().delay
835 return self._modify_and_retry(modifier, backoffer, True)
838 def _modify_and_retry(self, modifier, backoffer, first_time):
840 I try to apply modifier to the contents of this version of the
841 mutable file. If I succeed, I return an UploadResults instance
842 describing my success. If I fail, I try again after waiting for
845 log.msg("doing modify")
847 d = self._update_servermap()
849 # We ran into trouble; do MODE_CHECK so we're a little more
850 # careful on subsequent tries.
851 d = self._update_servermap(mode=MODE_CHECK)
853 d.addCallback(lambda ignored:
854 self._modify_once(modifier, first_time))
856 f.trap(UncoordinatedWriteError)
857 # Uh oh, it broke. We're allowed to trust the servermap for our
858 # first try, but after that we need to update it. It's
859 # possible that we've failed due to a race with another
860 # uploader, and if the race is to converge correctly, we
861 # need to know about that upload.
862 d2 = defer.maybeDeferred(backoffer, self, f)
863 d2.addCallback(lambda ignored:
864 self._modify_and_retry(modifier,
871 def _modify_once(self, modifier, first_time):
873 I attempt to apply a modifier to the contents of the mutable
876 assert self._servermap.get_last_update()[0] != MODE_READ
878 # download_to_data is serialized, so we have to call this to
880 d = self._try_to_download_data()
881 def _apply(old_contents):
882 new_contents = modifier(old_contents, self._servermap, first_time)
883 precondition((isinstance(new_contents, str) or
884 new_contents is None),
885 "Modifier function must return a string "
888 if new_contents is None or new_contents == old_contents:
889 log.msg("no changes")
890 # no changes need to be made
893 # However, since Publish is not automatically doing a
894 # recovery when it observes UCWE, we need to do a second
895 # publish. See #551 for details. We'll basically loop until
896 # we managed an uncontested publish.
897 old_uploadable = MutableData(old_contents)
898 new_contents = old_uploadable
900 new_contents = MutableData(new_contents)
902 return self._upload(new_contents)
903 d.addCallback(_apply)
907 def is_readonly(self):
909 I return True if this MutableFileVersion provides no write
910 access to the file that it encapsulates, and False if it
911 provides the ability to modify the file.
913 return self._writekey is None
916 def is_mutable(self):
918 I return True, since mutable files are always mutable by
924 def get_storage_index(self):
926 I return the storage index of the reference that I encapsulate.
928 return self._storage_index
933 I return the length, in bytes, of this readable object.
935 return self._servermap.size_of_version(self._version)
938 def download_to_data(self, fetch_privkey=False, progress=None):
940 I return a Deferred that fires with the contents of this
941 readable object as a byte string.
944 c = consumer.MemoryConsumer(progress=progress)
945 d = self.read(c, fetch_privkey=fetch_privkey)
946 d.addCallback(lambda mc: "".join(mc.chunks))
950 def _try_to_download_data(self):
952 I am an unserialized cousin of download_to_data; I am called
953 from the children of modify() to download the data associated
954 with this mutable version.
956 c = consumer.MemoryConsumer()
957 # modify will almost certainly write, so we need the privkey.
958 d = self._read(c, fetch_privkey=True)
959 d.addCallback(lambda mc: "".join(mc.chunks))
963 def read(self, consumer, offset=0, size=None, fetch_privkey=False):
965 I read a portion (possibly all) of the mutable file that I
966 reference into consumer.
968 return self._do_serialized(self._read, consumer, offset, size,
972 def _read(self, consumer, offset=0, size=None, fetch_privkey=False):
974 I am the serialized companion of read.
976 r = Retrieve(self._node, self._storage_broker, self._servermap,
977 self._version, fetch_privkey)
979 self._history.notify_retrieve(r.get_status())
980 d = r.download(consumer, offset, size)
984 def _do_serialized(self, cb, *args, **kwargs):
985 # note: to avoid deadlock, this callable is *not* allowed to invoke
986 # other serialized methods within this (or any other)
987 # MutableFileNode. The callable should be a bound method of this same
990 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
991 # we need to put off d.callback until this Deferred is finished being
992 # processed. Otherwise the caller's subsequent activities (like,
993 # doing other things with this node) can cause reentrancy problems in
994 # the Deferred code itself
995 self._serializer.addBoth(lambda res: eventually(d.callback, res))
996 # add a log.err just in case something really weird happens, because
997 # self._serializer stays around forever, therefore we won't see the
998 # usual Unhandled Error in Deferred that would give us a hint.
999 self._serializer.addErrback(log.err)
1003 def _upload(self, new_contents):
1004 #assert self._pubkey, "update_servermap must be called before publish"
1005 p = Publish(self._node, self._storage_broker, self._servermap)
1007 self._history.notify_publish(p.get_status(),
1008 new_contents.get_size())
1009 d = p.publish(new_contents)
1010 d.addCallback(self._did_upload, new_contents.get_size())
1014 def _did_upload(self, res, size):
1015 self._most_recent_size = size
1018 def update(self, data, offset):
1020 Do an update of this mutable file version by inserting data at
1021 offset within the file. If offset is the EOF, this is an append
1022 operation. I return a Deferred that fires with the results of
1023 the update operation when it has completed.
1025 In cases where update does not append any data, or where it does
1026 not append so many blocks that the block count crosses a
1027 power-of-two boundary, this operation will use roughly
1028 O(data.get_size()) memory/bandwidth/CPU to perform the update.
1029 Otherwise, it must download, re-encode, and upload the entire
1030 file again, which will use O(filesize) resources.
1032 return self._do_serialized(self._update, data, offset)
1035 def _update(self, data, offset):
1037 I update the mutable file version represented by this particular
1038 IMutableVersion by inserting the data in data at the offset
1039 offset. I return a Deferred that fires when this has been
1042 new_size = data.get_size() + offset
1043 old_size = self.get_size()
1044 segment_size = self._version[3]
1045 num_old_segments = mathutil.div_ceil(old_size,
1047 num_new_segments = mathutil.div_ceil(new_size,
1049 log.msg("got %d old segments, %d new segments" % \
1050 (num_old_segments, num_new_segments))
1052 # We do a whole file re-encode if the file is an SDMF file.
1053 if self._version[2]: # version[2] == SDMF salt, which MDMF lacks
1054 log.msg("doing re-encode instead of in-place update")
1055 return self._do_modify_update(data, offset)
1057 # Otherwise, we can replace just the parts that are changing.
1058 log.msg("updating in place")
1059 d = self._do_update_update(data, offset)
1060 d.addCallback(self._decode_and_decrypt_segments, data, offset)
1061 d.addCallback(self._build_uploadable_and_finish, data, offset)
1065 def _do_modify_update(self, data, offset):
1067 I perform a file update by modifying the contents of the file
1068 after downloading it, then reuploading it. I am less efficient
1069 than _do_update_update, but am necessary for certain updates.
1071 def m(old, servermap, first_time):
1073 rest = offset + data.get_size()
1075 new += "".join(data.read(data.get_size()))
1078 return self._modify(m, None)
1081 def _do_update_update(self, data, offset):
1083 I start the Servermap update that gets us the data we need to
1084 continue the update process. I return a Deferred that fires when
1085 the servermap update is done.
1087 assert IMutableUploadable.providedBy(data)
1088 assert self.is_mutable()
1089 # offset == self.get_size() is valid and means that we are
1090 # appending data to the file.
1091 assert offset <= self.get_size()
1093 segsize = self._version[3]
1094 # We'll need the segment that the data starts in, regardless of
1095 # what we'll do later.
1096 start_segment = offset // segsize
1098 # We only need the end segment if the data we append does not go
1099 # beyond the current end-of-file.
1100 end_segment = start_segment
1101 if offset + data.get_size() < self.get_size():
1102 end_data = offset + data.get_size()
1103 # The last byte we touch is the end_data'th byte, which is actually
1104 # byte end_data - 1 because bytes are zero-indexed.
1106 end_segment = end_data // segsize
1108 self._start_segment = start_segment
1109 self._end_segment = end_segment
1111 # Now ask for the servermap to be updated in MODE_WRITE with
1112 # this update range.
1113 return self._update_servermap(update_range=(start_segment,
1117 def _decode_and_decrypt_segments(self, ignored, data, offset):
1119 After the servermap update, I take the encrypted and encoded
1120 data that the servermap fetched while doing its update and
1121 transform it into decoded-and-decrypted plaintext that can be
1122 used by the new uploadable. I return a Deferred that fires with
1125 r = Retrieve(self._node, self._storage_broker, self._servermap,
1127 # decode: takes in our blocks and salts from the servermap,
1128 # returns a Deferred that fires with the corresponding plaintext
1129 # segments. Does not download -- simply takes advantage of
1130 # existing infrastructure within the Retrieve class to avoid
1132 sm = self._servermap
1133 # XXX: If the methods in the servermap don't work as
1134 # abstractions, you should rewrite them instead of going around
1136 update_data = sm.update_data
1137 start_segments = {} # shnum -> start segment
1138 end_segments = {} # shnum -> end segment
1139 blockhashes = {} # shnum -> blockhash tree
1140 for (shnum, original_data) in update_data.iteritems():
1141 data = [d[1] for d in original_data if d[0] == self._version]
1142 # data is [(blockhashes,start,end)..]
1144 # Every data entry in our list should now be share shnum for
1145 # a particular version of the mutable file, so all of the
1146 # entries should be identical.
1148 assert [x for x in data if x != datum] == []
1150 # datum is (blockhashes,start,end)
1151 blockhashes[shnum] = datum[0]
1152 start_segments[shnum] = datum[1] # (block,salt) bytestrings
1153 end_segments[shnum] = datum[2]
1155 d1 = r.decode(start_segments, self._start_segment)
1156 d2 = r.decode(end_segments, self._end_segment)
1157 d3 = defer.succeed(blockhashes)
1158 return deferredutil.gatherResults([d1, d2, d3])
1161 def _build_uploadable_and_finish(self, segments_and_bht, data, offset):
1163 After the process has the plaintext segments, I build the
1164 TransformingUploadable that the publisher will eventually
1165 re-upload to the grid. I then invoke the publisher with that
1166 uploadable, and return a Deferred when the publish operation has
1167 completed without issue.
1169 u = TransformingUploadable(data, offset,
1171 segments_and_bht[0],
1172 segments_and_bht[1])
1173 p = Publish(self._node, self._storage_broker, self._servermap)
1174 return p.update(u, offset, segments_and_bht[2], self._version)
1177 def _update_servermap(self, mode=MODE_WRITE, update_range=None):
1179 I update the servermap. I return a Deferred that fires when the
1180 servermap update is done.
1183 u = ServermapUpdater(self._node, self._storage_broker, Monitor(),
1186 update_range=update_range)
1188 u = ServermapUpdater(self._node, self._storage_broker, Monitor(),