4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, ICheckable, ICheckResults, \
8 NotEnoughSharesError, MDMF_VERSION, SDMF_VERSION, IMutableUploadable, \
9 IMutableFileVersion, IWriteable
10 from allmydata.util import hashutil, log, consumer, deferredutil, mathutil
11 from allmydata.util.assertutil import precondition
12 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI, \
13 WriteableMDMFFileURI, ReadonlyMDMFFileURI
14 from allmydata.monitor import Monitor
15 from pycryptopp.cipher.aes import AES
17 from allmydata.mutable.publish import Publish, MutableData,\
18 TransformingUploadable
19 from allmydata.mutable.common import MODE_READ, MODE_WRITE, MODE_CHECK, UnrecoverableFileError, \
20 ResponseCache, UncoordinatedWriteError
21 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
22 from allmydata.mutable.retrieve import Retrieve
23 from allmydata.mutable.checker import MutableChecker, MutableCheckAndRepairer
24 from allmydata.mutable.repairer import Repairer
28 # these parameters are copied from foolscap.reconnector, which gets them
29 # from twisted.internet.protocol.ReconnectingClientFactory
31 factor = 2.7182818284590451 # (math.e)
32 jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
36 self._delay = self.initialDelay
38 def delay(self, node, f):
42 self._delay = self._delay * self.factor
43 self._delay = random.normalvariate(self._delay,
44 self._delay * self.jitter)
46 reactor.callLater(self._delay, d.callback, None)
49 # use nodemaker.create_mutable_file() to make one of these
51 class MutableFileNode:
52 implements(IMutableFileNode, ICheckable)
54 def __init__(self, storage_broker, secret_holder,
55 default_encoding_parameters, history):
56 self._storage_broker = storage_broker
57 self._secret_holder = secret_holder
58 self._default_encoding_parameters = default_encoding_parameters
59 self._history = history
60 self._pubkey = None # filled in upon first read
61 self._privkey = None # filled in if we're mutable
62 # we keep track of the last encoding parameters that we use. These
63 # are updated upon retrieve, and used by publish. If we publish
64 # without ever reading (i.e. overwrite()), then we use these values.
65 self._required_shares = default_encoding_parameters["k"]
66 self._total_shares = default_encoding_parameters["n"]
67 self._sharemap = {} # known shares, shnum-to-[nodeids]
68 self._cache = ResponseCache()
69 self._most_recent_size = None
70 # filled in after __init__ if we're being created for the first time;
71 # filled in by the servermap updater before publishing, otherwise.
72 # set to this default value in case neither of those things happen,
73 # or in case the servermap can't find any shares to tell us what
75 self._protocol_version = None
77 # all users of this MutableFileNode go through the serializer. This
78 # takes advantage of the fact that Deferreds discard the callbacks
79 # that they're done with, so we can keep using the same Deferred
80 # forever without consuming more and more memory.
81 self._serializer = defer.succeed(None)
83 # Starting with MDMF, we can get these from caps if they're
84 # there. Leave them alone for now; they'll be filled in by my
85 # init_from_cap method if necessary.
86 self._downloader_hints = {}
89 if hasattr(self, '_uri'):
90 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
92 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
94 def init_from_cap(self, filecap):
95 # we have the URI, but we have not yet retrieved the public
96 # verification key, nor things like 'k' or 'N'. If and when someone
97 # wants to get our contents, we'll pull from shares and fill those
99 if isinstance(filecap, (WriteableMDMFFileURI, ReadonlyMDMFFileURI)):
100 self._protocol_version = MDMF_VERSION
101 elif isinstance(filecap, (ReadonlySSKFileURI, WriteableSSKFileURI)):
102 self._protocol_version = SDMF_VERSION
105 self._writekey = None
107 if not filecap.is_readonly() and filecap.is_mutable():
108 self._writekey = self._uri.writekey
109 self._readkey = self._uri.readkey
110 self._storage_index = self._uri.storage_index
111 self._fingerprint = self._uri.fingerprint
112 # the following values are learned during Retrieval
114 # self._required_shares
116 # and these are needed for Publish. They are filled in by Retrieval
117 # if possible, otherwise by the first peer that Publish talks to.
119 self._encprivkey = None
123 def create_with_keys(self, (pubkey, privkey), contents,
124 version=SDMF_VERSION):
125 """Call this to create a brand-new mutable file. It will create the
126 shares, find homes for them, and upload the initial contents (created
127 with the same rules as IClient.create_mutable_file() ). Returns a
128 Deferred that fires (with the MutableFileNode instance you should
129 use) when it completes.
131 self._pubkey, self._privkey = pubkey, privkey
132 pubkey_s = self._pubkey.serialize()
133 privkey_s = self._privkey.serialize()
134 self._writekey = hashutil.ssk_writekey_hash(privkey_s)
135 self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
136 self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
137 if version == MDMF_VERSION:
138 self._uri = WriteableMDMFFileURI(self._writekey, self._fingerprint)
139 self._protocol_version = version
140 elif version == SDMF_VERSION:
141 self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
142 self._protocol_version = version
143 self._readkey = self._uri.readkey
144 self._storage_index = self._uri.storage_index
145 initial_contents = self._get_initial_contents(contents)
146 return self._upload(initial_contents, None)
148 def _get_initial_contents(self, contents):
150 return MutableData("")
152 if isinstance(contents, str):
153 return MutableData(contents)
155 if IMutableUploadable.providedBy(contents):
158 assert callable(contents), "%s should be callable, not %s" % \
159 (contents, type(contents))
160 return contents(self)
162 def _encrypt_privkey(self, writekey, privkey):
164 crypttext = enc.process(privkey)
167 def _decrypt_privkey(self, enc_privkey):
168 enc = AES(self._writekey)
169 privkey = enc.process(enc_privkey)
172 def _populate_pubkey(self, pubkey):
173 self._pubkey = pubkey
174 def _populate_required_shares(self, required_shares):
175 self._required_shares = required_shares
176 def _populate_total_shares(self, total_shares):
177 self._total_shares = total_shares
179 def _populate_privkey(self, privkey):
180 self._privkey = privkey
181 def _populate_encprivkey(self, encprivkey):
182 self._encprivkey = encprivkey
183 def _add_to_cache(self, verinfo, shnum, offset, data):
184 self._cache.add(verinfo, shnum, offset, data)
185 def _read_from_cache(self, verinfo, shnum, offset, length):
186 return self._cache.read(verinfo, shnum, offset, length)
188 def get_write_enabler(self, peerid):
189 assert len(peerid) == 20
190 return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
191 def get_renewal_secret(self, peerid):
192 assert len(peerid) == 20
193 crs = self._secret_holder.get_renewal_secret()
194 frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
195 return hashutil.bucket_renewal_secret_hash(frs, peerid)
196 def get_cancel_secret(self, peerid):
197 assert len(peerid) == 20
198 ccs = self._secret_holder.get_cancel_secret()
199 fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
200 return hashutil.bucket_cancel_secret_hash(fcs, peerid)
202 def get_writekey(self):
203 return self._writekey
204 def get_readkey(self):
206 def get_storage_index(self):
207 return self._storage_index
208 def get_fingerprint(self):
209 return self._fingerprint
210 def get_privkey(self):
212 def get_encprivkey(self):
213 return self._encprivkey
214 def get_pubkey(self):
217 def get_required_shares(self):
218 return self._required_shares
219 def get_total_shares(self):
220 return self._total_shares
222 ####################################
226 return self._most_recent_size
228 def get_current_size(self):
229 d = self.get_size_of_best_version()
230 d.addCallback(self._stash_size)
233 def _stash_size(self, size):
234 self._most_recent_size = size
239 def get_readcap(self):
240 return self._uri.get_readonly()
241 def get_verify_cap(self):
242 return self._uri.get_verify_cap()
243 def get_repair_cap(self):
244 if self._uri.is_readonly():
249 return self._uri.to_string()
251 def get_write_uri(self):
252 if self.is_readonly():
254 return self._uri.to_string()
256 def get_readonly_uri(self):
257 return self._uri.get_readonly().to_string()
259 def get_readonly(self):
260 if self.is_readonly():
262 ro = MutableFileNode(self._storage_broker, self._secret_holder,
263 self._default_encoding_parameters, self._history)
264 ro.init_from_cap(self._uri.get_readonly())
267 def is_mutable(self):
268 return self._uri.is_mutable()
270 def is_readonly(self):
271 return self._uri.is_readonly()
273 def is_unknown(self):
276 def is_allowed_in_immutable_directory(self):
277 return not self._uri.is_mutable()
279 def raise_error(self):
283 return hash((self.__class__, self._uri))
284 def __cmp__(self, them):
285 if cmp(type(self), type(them)):
286 return cmp(type(self), type(them))
287 if cmp(self.__class__, them.__class__):
288 return cmp(self.__class__, them.__class__)
289 return cmp(self._uri, them._uri)
292 #################################
295 def check(self, monitor, verify=False, add_lease=False):
296 checker = MutableChecker(self, self._storage_broker,
297 self._history, monitor)
298 return checker.check(verify, add_lease)
300 def check_and_repair(self, monitor, verify=False, add_lease=False):
301 checker = MutableCheckAndRepairer(self, self._storage_broker,
302 self._history, monitor)
303 return checker.check(verify, add_lease)
305 #################################
308 def repair(self, check_results, force=False):
309 assert ICheckResults(check_results)
310 r = Repairer(self, check_results)
315 #################################
318 def get_best_readable_version(self):
320 I return a Deferred that fires with a MutableFileVersion
321 representing the best readable version of the file that I
324 return self.get_readable_version()
327 def get_readable_version(self, servermap=None, version=None):
329 I return a Deferred that fires with an MutableFileVersion for my
330 version argument, if there is a recoverable file of that version
331 on the grid. If there is no recoverable version, I fire with an
332 UnrecoverableFileError.
334 If a servermap is provided, I look in there for the requested
335 version. If no servermap is provided, I create and update a new
338 If no version is provided, then I return a MutableFileVersion
339 representing the best recoverable version of the file.
341 d = self._get_version_from_servermap(MODE_READ, servermap, version)
342 def _build_version((servermap, their_version)):
343 assert their_version in servermap.recoverable_versions()
344 assert their_version in servermap.make_versionmap()
346 mfv = MutableFileVersion(self,
350 self._storage_broker,
352 history=self._history)
353 assert mfv.is_readonly()
354 mfv.set_downloader_hints(self._downloader_hints)
355 # our caller can use this to download the contents of the
358 return d.addCallback(_build_version)
361 def _get_version_from_servermap(self,
366 I return a Deferred that fires with (servermap, version).
368 This function performs validation and a servermap update. If it
369 returns (servermap, version), the caller can assume that:
370 - servermap was last updated in mode.
371 - version is recoverable, and corresponds to the servermap.
373 If version and servermap are provided to me, I will validate
374 that version exists in the servermap, and that the servermap was
377 If version is not provided, but servermap is, I will validate
378 the servermap and return the best recoverable version that I can
379 find in the servermap.
381 If the version is provided but the servermap isn't, I will
382 obtain a servermap that has been updated in the correct mode and
383 validate that version is found and recoverable.
385 If neither servermap nor version are provided, I will obtain a
386 servermap updated in the correct mode, and return the best
387 recoverable version that I can find in there.
390 if servermap and servermap.last_update_mode == mode:
391 d = defer.succeed(servermap)
393 d = self._get_servermap(mode)
395 def _get_version(servermap, v):
396 if v and v not in servermap.recoverable_versions():
399 v = servermap.best_recoverable_version()
401 raise UnrecoverableFileError("no recoverable versions")
403 return (servermap, v)
404 return d.addCallback(_get_version, version)
407 def download_best_version(self):
409 I return a Deferred that fires with the contents of the best
410 version of this mutable file.
412 return self._do_serialized(self._download_best_version)
415 def _download_best_version(self):
417 I am the serialized sibling of download_best_version.
419 d = self.get_best_readable_version()
420 d.addCallback(self._record_size)
421 d.addCallback(lambda version: version.download_to_data())
423 # It is possible that the download will fail because there
424 # aren't enough shares to be had. If so, we will try again after
425 # updating the servermap in MODE_WRITE, which may find more
426 # shares than updating in MODE_READ, as we just did. We can do
427 # this by getting the best mutable version and downloading from
428 # that -- the best mutable version will be a MutableFileVersion
429 # with a servermap that was last updated in MODE_WRITE, as we
430 # want. If this fails, then we give up.
431 def _maybe_retry(failure):
432 failure.trap(NotEnoughSharesError)
434 d = self.get_best_mutable_version()
435 d.addCallback(self._record_size)
436 d.addCallback(lambda version: version.download_to_data())
439 d.addErrback(_maybe_retry)
443 def _record_size(self, mfv):
445 I record the size of a mutable file version.
447 self._most_recent_size = mfv.get_size()
451 def get_size_of_best_version(self):
453 I return the size of the best version of this mutable file.
455 This is equivalent to calling get_size() on the result of
456 get_best_readable_version().
458 d = self.get_best_readable_version()
459 return d.addCallback(lambda mfv: mfv.get_size())
462 #################################
465 def get_best_mutable_version(self, servermap=None):
467 I return a Deferred that fires with a MutableFileVersion
468 representing the best readable version of the file that I
469 represent. I am like get_best_readable_version, except that I
470 will try to make a writeable version if I can.
472 return self.get_mutable_version(servermap=servermap)
475 def get_mutable_version(self, servermap=None, version=None):
477 I return a version of this mutable file. I return a Deferred
478 that fires with a MutableFileVersion
480 If version is provided, the Deferred will fire with a
481 MutableFileVersion initailized with that version. Otherwise, it
482 will fire with the best version that I can recover.
484 If servermap is provided, I will use that to find versions
485 instead of performing my own servermap update.
487 if self.is_readonly():
488 return self.get_readable_version(servermap=servermap,
491 # get_mutable_version => write intent, so we require that the
492 # servermap is updated in MODE_WRITE
493 d = self._get_version_from_servermap(MODE_WRITE, servermap, version)
494 def _build_version((servermap, smap_version)):
495 # these should have been set by the servermap update.
496 assert self._secret_holder
497 assert self._writekey
499 mfv = MutableFileVersion(self,
503 self._storage_broker,
507 history=self._history)
508 assert not mfv.is_readonly()
509 mfv.set_downloader_hints(self._downloader_hints)
512 return d.addCallback(_build_version)
515 # XXX: I'm uncomfortable with the difference between upload and
516 # overwrite, which, FWICT, is basically that you don't have to
517 # do a servermap update before you overwrite. We split them up
518 # that way anyway, so I guess there's no real difficulty in
519 # offering both ways to callers, but it also makes the
520 # public-facing API cluttery, and makes it hard to discern the
521 # right way of doing things.
523 # In general, we leave it to callers to ensure that they aren't
524 # going to cause UncoordinatedWriteErrors when working with
525 # MutableFileVersions. We know that the next three operations
526 # (upload, overwrite, and modify) will all operate on the same
527 # version, so we say that only one of them can be going on at once,
528 # and serialize them to ensure that that actually happens, since as
529 # the caller in this situation it is our job to do that.
530 def overwrite(self, new_contents):
532 I overwrite the contents of the best recoverable version of this
533 mutable file with new_contents. This is equivalent to calling
534 overwrite on the result of get_best_mutable_version with
535 new_contents as an argument. I return a Deferred that eventually
536 fires with the results of my replacement process.
538 # TODO: Update downloader hints.
539 return self._do_serialized(self._overwrite, new_contents)
542 def _overwrite(self, new_contents):
544 I am the serialized sibling of overwrite.
546 d = self.get_best_mutable_version()
547 d.addCallback(lambda mfv: mfv.overwrite(new_contents))
548 d.addCallback(self._did_upload, new_contents.get_size())
552 def upload(self, new_contents, servermap):
554 I overwrite the contents of the best recoverable version of this
555 mutable file with new_contents, using servermap instead of
556 creating/updating our own servermap. I return a Deferred that
557 fires with the results of my upload.
559 # TODO: Update downloader hints
560 return self._do_serialized(self._upload, new_contents, servermap)
563 def modify(self, modifier, backoffer=None):
565 I modify the contents of the best recoverable version of this
566 mutable file with the modifier. This is equivalent to calling
567 modify on the result of get_best_mutable_version. I return a
568 Deferred that eventually fires with an UploadResults instance
569 describing this process.
571 # TODO: Update downloader hints.
572 return self._do_serialized(self._modify, modifier, backoffer)
575 def _modify(self, modifier, backoffer):
577 I am the serialized sibling of modify.
579 d = self.get_best_mutable_version()
580 d.addCallback(lambda mfv: mfv.modify(modifier, backoffer))
584 def download_version(self, servermap, version, fetch_privkey=False):
586 Download the specified version of this mutable file. I return a
587 Deferred that fires with the contents of the specified version
588 as a bytestring, or errbacks if the file is not recoverable.
590 d = self.get_readable_version(servermap, version)
591 return d.addCallback(lambda mfv: mfv.download_to_data(fetch_privkey))
594 def get_servermap(self, mode):
596 I return a servermap that has been updated in mode.
598 mode should be one of MODE_READ, MODE_WRITE, MODE_CHECK or
599 MODE_ANYTHING. See servermap.py for more on what these mean.
601 return self._do_serialized(self._get_servermap, mode)
604 def _get_servermap(self, mode):
606 I am a serialized twin to get_servermap.
608 servermap = ServerMap()
609 d = self._update_servermap(servermap, mode)
610 # The servermap will tell us about the most recent size of the
611 # file, so we may as well set that so that callers might get
612 # more data about us.
613 if not self._most_recent_size:
614 d.addCallback(self._get_size_from_servermap)
618 def _get_size_from_servermap(self, servermap):
620 I extract the size of the best version of this file and record
621 it in self._most_recent_size. I return the servermap that I was
624 if servermap.recoverable_versions():
625 v = servermap.best_recoverable_version()
626 size = v[4] # verinfo[4] == size
627 self._most_recent_size = size
631 def _update_servermap(self, servermap, mode):
632 u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
635 self._history.notify_mapupdate(u.get_status())
639 #def set_version(self, version):
640 # I can be set in two ways:
641 # 1. When the node is created.
642 # 2. (for an existing share) when the Servermap is updated
644 # assert version in (MDMF_VERSION, SDMF_VERSION)
645 # self._protocol_version = version
648 def get_version(self):
649 return self._protocol_version
652 def _do_serialized(self, cb, *args, **kwargs):
653 # note: to avoid deadlock, this callable is *not* allowed to invoke
654 # other serialized methods within this (or any other)
655 # MutableFileNode. The callable should be a bound method of this same
658 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
659 # we need to put off d.callback until this Deferred is finished being
660 # processed. Otherwise the caller's subsequent activities (like,
661 # doing other things with this node) can cause reentrancy problems in
662 # the Deferred code itself
663 self._serializer.addBoth(lambda res: eventually(d.callback, res))
664 # add a log.err just in case something really weird happens, because
665 # self._serializer stays around forever, therefore we won't see the
666 # usual Unhandled Error in Deferred that would give us a hint.
667 self._serializer.addErrback(log.err)
671 def _upload(self, new_contents, servermap):
673 A MutableFileNode still has to have some way of getting
674 published initially, which is what I am here for. After that,
675 all publishing, updating, modifying and so on happens through
678 assert self._pubkey, "update_servermap must be called before publish"
680 # Define IPublishInvoker with a set_downloader_hints method?
681 # Then have the publisher call that method when it's done publishing?
682 p = Publish(self, self._storage_broker, servermap)
684 self._history.notify_publish(p.get_status(),
685 new_contents.get_size())
686 d = p.publish(new_contents)
687 d.addCallback(self._did_upload, new_contents.get_size())
691 def set_downloader_hints(self, hints):
692 self._downloader_hints = hints
694 def _did_upload(self, res, size):
695 self._most_recent_size = size
699 class MutableFileVersion:
701 I represent a specific version (most likely the best version) of a
704 Since I implement IReadable, instances which hold a
705 reference to an instance of me are guaranteed the ability (absent
706 connection difficulties or unrecoverable versions) to read the file
707 that I represent. Depending on whether I was initialized with a
708 write capability or not, I may also provide callers the ability to
709 overwrite or modify the contents of the mutable file that I
712 implements(IMutableFileVersion, IWriteable)
726 self._servermap = servermap
727 self._version = version
728 self._storage_index = storage_index
729 self._write_secrets = write_secrets
730 self._history = history
731 self._storage_broker = storage_broker
733 #assert isinstance(readcap, IURI)
734 self._readcap = readcap
736 self._writekey = writekey
737 self._serializer = defer.succeed(None)
740 def get_sequence_number(self):
742 Get the sequence number of the mutable version that I represent.
744 return self._version[0] # verinfo[0] == the sequence number
748 def get_writekey(self):
750 I return a writekey or None if I don't have a writekey.
752 return self._writekey
755 def set_downloader_hints(self, hints):
757 I set the downloader hints.
759 assert isinstance(hints, dict)
761 self._downloader_hints = hints
764 def get_downloader_hints(self):
766 I return the downloader hints.
768 return self._downloader_hints
771 def overwrite(self, new_contents):
773 I overwrite the contents of this mutable file version with the
774 data in new_contents.
776 assert not self.is_readonly()
778 return self._do_serialized(self._overwrite, new_contents)
781 def _overwrite(self, new_contents):
782 assert IMutableUploadable.providedBy(new_contents)
783 assert self._servermap.last_update_mode == MODE_WRITE
785 return self._upload(new_contents)
788 def modify(self, modifier, backoffer=None):
789 """I use a modifier callback to apply a change to the mutable file.
790 I implement the following pseudocode::
792 obtain_mutable_filenode_lock()
795 update_servermap(MODE_WRITE)
796 old = retrieve_best_version()
797 new = modifier(old, servermap, first_time)
802 except UncoordinatedWriteError, e:
806 release_mutable_filenode_lock()
808 The idea is that your modifier function can apply a delta of some
809 sort, and it will be re-run as necessary until it succeeds. The
810 modifier must inspect the old version to see whether its delta has
811 already been applied: if so it should return the contents unmodified.
813 Note that the modifier is required to run synchronously, and must not
814 invoke any methods on this MutableFileNode instance.
816 The backoff-er is a callable that is responsible for inserting a
817 random delay between subsequent attempts, to help competing updates
818 from colliding forever. It is also allowed to give up after a while.
819 The backoffer is given two arguments: this MutableFileNode, and the
820 Failure object that contains the UncoordinatedWriteError. It should
821 return a Deferred that will fire when the next attempt should be
822 made, or return the Failure if the loop should give up. If
823 backoffer=None, a default one is provided which will perform
824 exponential backoff, and give up after 4 tries. Note that the
825 backoffer should not invoke any methods on this MutableFileNode
826 instance, and it needs to be highly conscious of deadlock issues.
828 assert not self.is_readonly()
830 return self._do_serialized(self._modify, modifier, backoffer)
833 def _modify(self, modifier, backoffer):
834 if backoffer is None:
835 backoffer = BackoffAgent().delay
836 return self._modify_and_retry(modifier, backoffer, True)
839 def _modify_and_retry(self, modifier, backoffer, first_time):
841 I try to apply modifier to the contents of this version of the
842 mutable file. If I succeed, I return an UploadResults instance
843 describing my success. If I fail, I try again after waiting for
846 log.msg("doing modify")
848 d = self._update_servermap()
850 # We ran into trouble; do MODE_CHECK so we're a little more
851 # careful on subsequent tries.
852 d = self._update_servermap(mode=MODE_CHECK)
854 d.addCallback(lambda ignored:
855 self._modify_once(modifier, first_time))
857 f.trap(UncoordinatedWriteError)
858 # Uh oh, it broke. We're allowed to trust the servermap for our
859 # first try, but after that we need to update it. It's
860 # possible that we've failed due to a race with another
861 # uploader, and if the race is to converge correctly, we
862 # need to know about that upload.
863 d2 = defer.maybeDeferred(backoffer, self, f)
864 d2.addCallback(lambda ignored:
865 self._modify_and_retry(modifier,
872 def _modify_once(self, modifier, first_time):
874 I attempt to apply a modifier to the contents of the mutable
877 assert self._servermap.last_update_mode != MODE_READ
879 # download_to_data is serialized, so we have to call this to
881 d = self._try_to_download_data()
882 def _apply(old_contents):
883 new_contents = modifier(old_contents, self._servermap, first_time)
884 precondition((isinstance(new_contents, str) or
885 new_contents is None),
886 "Modifier function must return a string "
889 if new_contents is None or new_contents == old_contents:
890 log.msg("no changes")
891 # no changes need to be made
894 # However, since Publish is not automatically doing a
895 # recovery when it observes UCWE, we need to do a second
896 # publish. See #551 for details. We'll basically loop until
897 # we managed an uncontested publish.
898 old_uploadable = MutableData(old_contents)
899 new_contents = old_uploadable
901 new_contents = MutableData(new_contents)
903 return self._upload(new_contents)
904 d.addCallback(_apply)
908 def is_readonly(self):
910 I return True if this MutableFileVersion provides no write
911 access to the file that it encapsulates, and False if it
912 provides the ability to modify the file.
914 return self._writekey is None
917 def is_mutable(self):
919 I return True, since mutable files are always mutable by
925 def get_storage_index(self):
927 I return the storage index of the reference that I encapsulate.
929 return self._storage_index
934 I return the length, in bytes, of this readable object.
936 return self._servermap.size_of_version(self._version)
939 def download_to_data(self, fetch_privkey=False):
941 I return a Deferred that fires with the contents of this
942 readable object as a byte string.
945 c = consumer.MemoryConsumer()
946 d = self.read(c, fetch_privkey=fetch_privkey)
947 d.addCallback(lambda mc: "".join(mc.chunks))
951 def _try_to_download_data(self):
953 I am an unserialized cousin of download_to_data; I am called
954 from the children of modify() to download the data associated
955 with this mutable version.
957 c = consumer.MemoryConsumer()
958 # modify will almost certainly write, so we need the privkey.
959 d = self._read(c, fetch_privkey=True)
960 d.addCallback(lambda mc: "".join(mc.chunks))
964 def read(self, consumer, offset=0, size=None, fetch_privkey=False):
966 I read a portion (possibly all) of the mutable file that I
967 reference into consumer.
969 return self._do_serialized(self._read, consumer, offset, size,
973 def _read(self, consumer, offset=0, size=None, fetch_privkey=False):
975 I am the serialized companion of read.
977 r = Retrieve(self._node, self._servermap, self._version, fetch_privkey)
979 self._history.notify_retrieve(r.get_status())
980 d = r.download(consumer, offset, size)
984 def _do_serialized(self, cb, *args, **kwargs):
985 # note: to avoid deadlock, this callable is *not* allowed to invoke
986 # other serialized methods within this (or any other)
987 # MutableFileNode. The callable should be a bound method of this same
990 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
991 # we need to put off d.callback until this Deferred is finished being
992 # processed. Otherwise the caller's subsequent activities (like,
993 # doing other things with this node) can cause reentrancy problems in
994 # the Deferred code itself
995 self._serializer.addBoth(lambda res: eventually(d.callback, res))
996 # add a log.err just in case something really weird happens, because
997 # self._serializer stays around forever, therefore we won't see the
998 # usual Unhandled Error in Deferred that would give us a hint.
999 self._serializer.addErrback(log.err)
1003 def _upload(self, new_contents):
1004 #assert self._pubkey, "update_servermap must be called before publish"
1005 p = Publish(self._node, self._storage_broker, self._servermap)
1007 self._history.notify_publish(p.get_status(),
1008 new_contents.get_size())
1009 d = p.publish(new_contents)
1010 d.addCallback(self._did_upload, new_contents.get_size())
1014 def _did_upload(self, res, size):
1015 self._most_recent_size = size
1018 def update(self, data, offset):
1020 Do an update of this mutable file version by inserting data at
1021 offset within the file. If offset is the EOF, this is an append
1022 operation. I return a Deferred that fires with the results of
1023 the update operation when it has completed.
1025 In cases where update does not append any data, or where it does
1026 not append so many blocks that the block count crosses a
1027 power-of-two boundary, this operation will use roughly
1028 O(data.get_size()) memory/bandwidth/CPU to perform the update.
1029 Otherwise, it must download, re-encode, and upload the entire
1030 file again, which will use O(filesize) resources.
1032 return self._do_serialized(self._update, data, offset)
1035 def _update(self, data, offset):
1037 I update the mutable file version represented by this particular
1038 IMutableVersion by inserting the data in data at the offset
1039 offset. I return a Deferred that fires when this has been
1042 new_size = data.get_size() + offset
1043 old_size = self.get_size()
1044 segment_size = self._version[3]
1045 num_old_segments = mathutil.div_ceil(old_size,
1047 num_new_segments = mathutil.div_ceil(new_size,
1049 log.msg("got %d old segments, %d new segments" % \
1050 (num_old_segments, num_new_segments))
1052 # We do a whole file re-encode if the file is an SDMF file.
1053 if self._version[2]: # version[2] == SDMF salt, which MDMF lacks
1054 log.msg("doing re-encode instead of in-place update")
1055 return self._do_modify_update(data, offset)
1057 # Otherwise, we can replace just the parts that are changing.
1058 log.msg("updating in place")
1059 d = self._do_update_update(data, offset)
1060 d.addCallback(self._decode_and_decrypt_segments, data, offset)
1061 d.addCallback(self._build_uploadable_and_finish, data, offset)
1065 def _do_modify_update(self, data, offset):
1067 I perform a file update by modifying the contents of the file
1068 after downloading it, then reuploading it. I am less efficient
1069 than _do_update_update, but am necessary for certain updates.
1071 def m(old, servermap, first_time):
1073 rest = offset + data.get_size()
1075 new += "".join(data.read(data.get_size()))
1078 return self._modify(m, None)
1081 def _do_update_update(self, data, offset):
1083 I start the Servermap update that gets us the data we need to
1084 continue the update process. I return a Deferred that fires when
1085 the servermap update is done.
1087 assert IMutableUploadable.providedBy(data)
1088 assert self.is_mutable()
1089 # offset == self.get_size() is valid and means that we are
1090 # appending data to the file.
1091 assert offset <= self.get_size()
1093 segsize = self._version[3]
1094 # We'll need the segment that the data starts in, regardless of
1095 # what we'll do later.
1096 start_segment = offset // segsize
1098 # We only need the end segment if the data we append does not go
1099 # beyond the current end-of-file.
1100 end_segment = start_segment
1101 if offset + data.get_size() < self.get_size():
1102 end_data = offset + data.get_size()
1103 # The last byte we touch is the end_data'th byte, which is actually
1104 # byte end_data - 1 because bytes are zero-indexed.
1106 end_segment = end_data // segsize
1108 self._start_segment = start_segment
1109 self._end_segment = end_segment
1111 # Now ask for the servermap to be updated in MODE_WRITE with
1112 # this update range.
1113 return self._update_servermap(update_range=(start_segment,
1117 def _decode_and_decrypt_segments(self, ignored, data, offset):
1119 After the servermap update, I take the encrypted and encoded
1120 data that the servermap fetched while doing its update and
1121 transform it into decoded-and-decrypted plaintext that can be
1122 used by the new uploadable. I return a Deferred that fires with
1125 r = Retrieve(self._node, self._servermap, self._version)
1126 # decode: takes in our blocks and salts from the servermap,
1127 # returns a Deferred that fires with the corresponding plaintext
1128 # segments. Does not download -- simply takes advantage of
1129 # existing infrastructure within the Retrieve class to avoid
1131 sm = self._servermap
1132 # XXX: If the methods in the servermap don't work as
1133 # abstractions, you should rewrite them instead of going around
1135 update_data = sm.update_data
1136 start_segments = {} # shnum -> start segment
1137 end_segments = {} # shnum -> end segment
1138 blockhashes = {} # shnum -> blockhash tree
1139 for (shnum, data) in update_data.iteritems():
1140 data = [d[1] for d in data if d[0] == self._version]
1142 # Every data entry in our list should now be share shnum for
1143 # a particular version of the mutable file, so all of the
1144 # entries should be identical.
1146 assert filter(lambda x: x != datum, data) == []
1148 blockhashes[shnum] = datum[0]
1149 start_segments[shnum] = datum[1]
1150 end_segments[shnum] = datum[2]
1152 d1 = r.decode(start_segments, self._start_segment)
1153 d2 = r.decode(end_segments, self._end_segment)
1154 d3 = defer.succeed(blockhashes)
1155 return deferredutil.gatherResults([d1, d2, d3])
1158 def _build_uploadable_and_finish(self, segments_and_bht, data, offset):
1160 After the process has the plaintext segments, I build the
1161 TransformingUploadable that the publisher will eventually
1162 re-upload to the grid. I then invoke the publisher with that
1163 uploadable, and return a Deferred when the publish operation has
1164 completed without issue.
1166 u = TransformingUploadable(data, offset,
1168 segments_and_bht[0],
1169 segments_and_bht[1])
1170 p = Publish(self._node, self._storage_broker, self._servermap)
1171 return p.update(u, offset, segments_and_bht[2], self._version)
1174 def _update_servermap(self, mode=MODE_WRITE, update_range=None):
1176 I update the servermap. I return a Deferred that fires when the
1177 servermap update is done.
1180 u = ServermapUpdater(self._node, self._storage_broker, Monitor(),
1183 update_range=update_range)
1185 u = ServermapUpdater(self._node, self._storage_broker, Monitor(),