4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, ICheckable, ICheckResults, \
8 NotEnoughSharesError, MDMF_VERSION, SDMF_VERSION, IMutableUploadable, \
9 IMutableFileVersion, IWriteable
10 from allmydata.util import hashutil, log, consumer, deferredutil, mathutil
11 from allmydata.util.assertutil import precondition
12 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI, \
13 WriteableMDMFFileURI, ReadonlyMDMFFileURI
14 from allmydata.monitor import Monitor
15 from pycryptopp.cipher.aes import AES
17 from allmydata.mutable.publish import Publish, MutableData,\
18 TransformingUploadable
19 from allmydata.mutable.common import MODE_READ, MODE_WRITE, MODE_CHECK, UnrecoverableFileError, \
20 ResponseCache, UncoordinatedWriteError
21 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
22 from allmydata.mutable.retrieve import Retrieve
23 from allmydata.mutable.checker import MutableChecker, MutableCheckAndRepairer
24 from allmydata.mutable.repairer import Repairer
28 # these parameters are copied from foolscap.reconnector, which gets them
29 # from twisted.internet.protocol.ReconnectingClientFactory
31 factor = 2.7182818284590451 # (math.e)
32 jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
36 self._delay = self.initialDelay
38 def delay(self, node, f):
42 self._delay = self._delay * self.factor
43 self._delay = random.normalvariate(self._delay,
44 self._delay * self.jitter)
46 reactor.callLater(self._delay, d.callback, None)
49 # use nodemaker.create_mutable_file() to make one of these
51 class MutableFileNode:
52 implements(IMutableFileNode, ICheckable)
54 def __init__(self, storage_broker, secret_holder,
55 default_encoding_parameters, history):
56 self._storage_broker = storage_broker
57 self._secret_holder = secret_holder
58 self._default_encoding_parameters = default_encoding_parameters
59 self._history = history
60 self._pubkey = None # filled in upon first read
61 self._privkey = None # filled in if we're mutable
62 # we keep track of the last encoding parameters that we use. These
63 # are updated upon retrieve, and used by publish. If we publish
64 # without ever reading (i.e. overwrite()), then we use these values.
65 self._required_shares = default_encoding_parameters["k"]
66 self._total_shares = default_encoding_parameters["n"]
67 self._sharemap = {} # known shares, shnum-to-[nodeids]
68 self._cache = ResponseCache()
69 self._most_recent_size = None
70 # filled in after __init__ if we're being created for the first time;
71 # filled in by the servermap updater before publishing, otherwise.
72 # set to this default value in case neither of those things happen,
73 # or in case the servermap can't find any shares to tell us what
75 self._protocol_version = None
77 # all users of this MutableFileNode go through the serializer. This
78 # takes advantage of the fact that Deferreds discard the callbacks
79 # that they're done with, so we can keep using the same Deferred
80 # forever without consuming more and more memory.
81 self._serializer = defer.succeed(None)
83 # Starting with MDMF, we can get these from caps if they're
84 # there. Leave them alone for now; they'll be filled in by my
85 # init_from_cap method if necessary.
86 self._downloader_hints = {}
89 if hasattr(self, '_uri'):
90 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
92 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
94 def init_from_cap(self, filecap):
95 # we have the URI, but we have not yet retrieved the public
96 # verification key, nor things like 'k' or 'N'. If and when someone
97 # wants to get our contents, we'll pull from shares and fill those
99 if isinstance(filecap, (WriteableMDMFFileURI, ReadonlyMDMFFileURI)):
100 self._protocol_version = MDMF_VERSION
101 elif isinstance(filecap, (ReadonlySSKFileURI, WriteableSSKFileURI)):
102 self._protocol_version = SDMF_VERSION
105 self._writekey = None
107 if not filecap.is_readonly() and filecap.is_mutable():
108 self._writekey = self._uri.writekey
109 self._readkey = self._uri.readkey
110 self._storage_index = self._uri.storage_index
111 self._fingerprint = self._uri.fingerprint
112 # the following values are learned during Retrieval
114 # self._required_shares
116 # and these are needed for Publish. They are filled in by Retrieval
117 # if possible, otherwise by the first peer that Publish talks to.
119 self._encprivkey = None
121 # Starting with MDMF caps, we allowed arbitrary extensions in
122 # caps. If we were initialized with a cap that had extensions,
123 # we want to remember them so we can tell MutableFileVersions
125 extensions = self._uri.get_extension_params()
127 extensions = map(int, extensions)
128 suspected_k, suspected_segsize = extensions
129 self._downloader_hints['k'] = suspected_k
130 self._downloader_hints['segsize'] = suspected_segsize
134 def create_with_keys(self, (pubkey, privkey), contents,
135 version=SDMF_VERSION):
136 """Call this to create a brand-new mutable file. It will create the
137 shares, find homes for them, and upload the initial contents (created
138 with the same rules as IClient.create_mutable_file() ). Returns a
139 Deferred that fires (with the MutableFileNode instance you should
140 use) when it completes.
142 self._pubkey, self._privkey = pubkey, privkey
143 pubkey_s = self._pubkey.serialize()
144 privkey_s = self._privkey.serialize()
145 self._writekey = hashutil.ssk_writekey_hash(privkey_s)
146 self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
147 self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
148 if version == MDMF_VERSION:
149 self._uri = WriteableMDMFFileURI(self._writekey, self._fingerprint)
150 self._protocol_version = version
151 elif version == SDMF_VERSION:
152 self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
153 self._protocol_version = version
154 self._readkey = self._uri.readkey
155 self._storage_index = self._uri.storage_index
156 initial_contents = self._get_initial_contents(contents)
157 return self._upload(initial_contents, None)
159 def _get_initial_contents(self, contents):
161 return MutableData("")
163 if isinstance(contents, str):
164 return MutableData(contents)
166 if IMutableUploadable.providedBy(contents):
169 assert callable(contents), "%s should be callable, not %s" % \
170 (contents, type(contents))
171 return contents(self)
173 def _encrypt_privkey(self, writekey, privkey):
175 crypttext = enc.process(privkey)
178 def _decrypt_privkey(self, enc_privkey):
179 enc = AES(self._writekey)
180 privkey = enc.process(enc_privkey)
183 def _populate_pubkey(self, pubkey):
184 self._pubkey = pubkey
185 def _populate_required_shares(self, required_shares):
186 self._required_shares = required_shares
187 def _populate_total_shares(self, total_shares):
188 self._total_shares = total_shares
190 def _populate_privkey(self, privkey):
191 self._privkey = privkey
192 def _populate_encprivkey(self, encprivkey):
193 self._encprivkey = encprivkey
194 def _add_to_cache(self, verinfo, shnum, offset, data):
195 self._cache.add(verinfo, shnum, offset, data)
196 def _read_from_cache(self, verinfo, shnum, offset, length):
197 return self._cache.read(verinfo, shnum, offset, length)
199 def get_write_enabler(self, peerid):
200 assert len(peerid) == 20
201 return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
202 def get_renewal_secret(self, peerid):
203 assert len(peerid) == 20
204 crs = self._secret_holder.get_renewal_secret()
205 frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
206 return hashutil.bucket_renewal_secret_hash(frs, peerid)
207 def get_cancel_secret(self, peerid):
208 assert len(peerid) == 20
209 ccs = self._secret_holder.get_cancel_secret()
210 fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
211 return hashutil.bucket_cancel_secret_hash(fcs, peerid)
213 def get_writekey(self):
214 return self._writekey
215 def get_readkey(self):
217 def get_storage_index(self):
218 return self._storage_index
219 def get_fingerprint(self):
220 return self._fingerprint
221 def get_privkey(self):
223 def get_encprivkey(self):
224 return self._encprivkey
225 def get_pubkey(self):
228 def get_required_shares(self):
229 return self._required_shares
230 def get_total_shares(self):
231 return self._total_shares
233 ####################################
237 return self._most_recent_size
239 def get_current_size(self):
240 d = self.get_size_of_best_version()
241 d.addCallback(self._stash_size)
244 def _stash_size(self, size):
245 self._most_recent_size = size
250 def get_readcap(self):
251 return self._uri.get_readonly()
252 def get_verify_cap(self):
253 return self._uri.get_verify_cap()
254 def get_repair_cap(self):
255 if self._uri.is_readonly():
260 return self._uri.to_string()
262 def get_write_uri(self):
263 if self.is_readonly():
265 return self._uri.to_string()
267 def get_readonly_uri(self):
268 return self._uri.get_readonly().to_string()
270 def get_readonly(self):
271 if self.is_readonly():
273 ro = MutableFileNode(self._storage_broker, self._secret_holder,
274 self._default_encoding_parameters, self._history)
275 ro.init_from_cap(self._uri.get_readonly())
278 def is_mutable(self):
279 return self._uri.is_mutable()
281 def is_readonly(self):
282 return self._uri.is_readonly()
284 def is_unknown(self):
287 def is_allowed_in_immutable_directory(self):
288 return not self._uri.is_mutable()
290 def raise_error(self):
294 return hash((self.__class__, self._uri))
295 def __cmp__(self, them):
296 if cmp(type(self), type(them)):
297 return cmp(type(self), type(them))
298 if cmp(self.__class__, them.__class__):
299 return cmp(self.__class__, them.__class__)
300 return cmp(self._uri, them._uri)
303 #################################
306 def check(self, monitor, verify=False, add_lease=False):
307 checker = MutableChecker(self, self._storage_broker,
308 self._history, monitor)
309 return checker.check(verify, add_lease)
311 def check_and_repair(self, monitor, verify=False, add_lease=False):
312 checker = MutableCheckAndRepairer(self, self._storage_broker,
313 self._history, monitor)
314 return checker.check(verify, add_lease)
316 #################################
319 def repair(self, check_results, force=False):
320 assert ICheckResults(check_results)
321 r = Repairer(self, check_results)
326 #################################
329 def get_best_readable_version(self):
331 I return a Deferred that fires with a MutableFileVersion
332 representing the best readable version of the file that I
335 return self.get_readable_version()
338 def get_readable_version(self, servermap=None, version=None):
340 I return a Deferred that fires with an MutableFileVersion for my
341 version argument, if there is a recoverable file of that version
342 on the grid. If there is no recoverable version, I fire with an
343 UnrecoverableFileError.
345 If a servermap is provided, I look in there for the requested
346 version. If no servermap is provided, I create and update a new
349 If no version is provided, then I return a MutableFileVersion
350 representing the best recoverable version of the file.
352 d = self._get_version_from_servermap(MODE_READ, servermap, version)
353 def _build_version((servermap, their_version)):
354 assert their_version in servermap.recoverable_versions()
355 assert their_version in servermap.make_versionmap()
357 mfv = MutableFileVersion(self,
361 self._storage_broker,
363 history=self._history)
364 assert mfv.is_readonly()
365 mfv.set_downloader_hints(self._downloader_hints)
366 # our caller can use this to download the contents of the
369 return d.addCallback(_build_version)
372 def _get_version_from_servermap(self,
377 I return a Deferred that fires with (servermap, version).
379 This function performs validation and a servermap update. If it
380 returns (servermap, version), the caller can assume that:
381 - servermap was last updated in mode.
382 - version is recoverable, and corresponds to the servermap.
384 If version and servermap are provided to me, I will validate
385 that version exists in the servermap, and that the servermap was
388 If version is not provided, but servermap is, I will validate
389 the servermap and return the best recoverable version that I can
390 find in the servermap.
392 If the version is provided but the servermap isn't, I will
393 obtain a servermap that has been updated in the correct mode and
394 validate that version is found and recoverable.
396 If neither servermap nor version are provided, I will obtain a
397 servermap updated in the correct mode, and return the best
398 recoverable version that I can find in there.
401 if servermap and servermap.last_update_mode == mode:
402 d = defer.succeed(servermap)
404 d = self._get_servermap(mode)
406 def _get_version(servermap, v):
407 if v and v not in servermap.recoverable_versions():
410 v = servermap.best_recoverable_version()
412 raise UnrecoverableFileError("no recoverable versions")
414 return (servermap, v)
415 return d.addCallback(_get_version, version)
418 def download_best_version(self):
420 I return a Deferred that fires with the contents of the best
421 version of this mutable file.
423 return self._do_serialized(self._download_best_version)
426 def _download_best_version(self):
428 I am the serialized sibling of download_best_version.
430 d = self.get_best_readable_version()
431 d.addCallback(self._record_size)
432 d.addCallback(lambda version: version.download_to_data())
434 # It is possible that the download will fail because there
435 # aren't enough shares to be had. If so, we will try again after
436 # updating the servermap in MODE_WRITE, which may find more
437 # shares than updating in MODE_READ, as we just did. We can do
438 # this by getting the best mutable version and downloading from
439 # that -- the best mutable version will be a MutableFileVersion
440 # with a servermap that was last updated in MODE_WRITE, as we
441 # want. If this fails, then we give up.
442 def _maybe_retry(failure):
443 failure.trap(NotEnoughSharesError)
445 d = self.get_best_mutable_version()
446 d.addCallback(self._record_size)
447 d.addCallback(lambda version: version.download_to_data())
450 d.addErrback(_maybe_retry)
454 def _record_size(self, mfv):
456 I record the size of a mutable file version.
458 self._most_recent_size = mfv.get_size()
462 def get_size_of_best_version(self):
464 I return the size of the best version of this mutable file.
466 This is equivalent to calling get_size() on the result of
467 get_best_readable_version().
469 d = self.get_best_readable_version()
470 return d.addCallback(lambda mfv: mfv.get_size())
473 #################################
476 def get_best_mutable_version(self, servermap=None):
478 I return a Deferred that fires with a MutableFileVersion
479 representing the best readable version of the file that I
480 represent. I am like get_best_readable_version, except that I
481 will try to make a writable version if I can.
483 return self.get_mutable_version(servermap=servermap)
486 def get_mutable_version(self, servermap=None, version=None):
488 I return a version of this mutable file. I return a Deferred
489 that fires with a MutableFileVersion
491 If version is provided, the Deferred will fire with a
492 MutableFileVersion initailized with that version. Otherwise, it
493 will fire with the best version that I can recover.
495 If servermap is provided, I will use that to find versions
496 instead of performing my own servermap update.
498 if self.is_readonly():
499 return self.get_readable_version(servermap=servermap,
502 # get_mutable_version => write intent, so we require that the
503 # servermap is updated in MODE_WRITE
504 d = self._get_version_from_servermap(MODE_WRITE, servermap, version)
505 def _build_version((servermap, smap_version)):
506 # these should have been set by the servermap update.
507 assert self._secret_holder
508 assert self._writekey
510 mfv = MutableFileVersion(self,
514 self._storage_broker,
518 history=self._history)
519 assert not mfv.is_readonly()
520 mfv.set_downloader_hints(self._downloader_hints)
523 return d.addCallback(_build_version)
526 # XXX: I'm uncomfortable with the difference between upload and
527 # overwrite, which, FWICT, is basically that you don't have to
528 # do a servermap update before you overwrite. We split them up
529 # that way anyway, so I guess there's no real difficulty in
530 # offering both ways to callers, but it also makes the
531 # public-facing API cluttery, and makes it hard to discern the
532 # right way of doing things.
534 # In general, we leave it to callers to ensure that they aren't
535 # going to cause UncoordinatedWriteErrors when working with
536 # MutableFileVersions. We know that the next three operations
537 # (upload, overwrite, and modify) will all operate on the same
538 # version, so we say that only one of them can be going on at once,
539 # and serialize them to ensure that that actually happens, since as
540 # the caller in this situation it is our job to do that.
541 def overwrite(self, new_contents):
543 I overwrite the contents of the best recoverable version of this
544 mutable file with new_contents. This is equivalent to calling
545 overwrite on the result of get_best_mutable_version with
546 new_contents as an argument. I return a Deferred that eventually
547 fires with the results of my replacement process.
549 # TODO: Update downloader hints.
550 return self._do_serialized(self._overwrite, new_contents)
553 def _overwrite(self, new_contents):
555 I am the serialized sibling of overwrite.
557 d = self.get_best_mutable_version()
558 d.addCallback(lambda mfv: mfv.overwrite(new_contents))
559 d.addCallback(self._did_upload, new_contents.get_size())
563 def upload(self, new_contents, servermap):
565 I overwrite the contents of the best recoverable version of this
566 mutable file with new_contents, using servermap instead of
567 creating/updating our own servermap. I return a Deferred that
568 fires with the results of my upload.
570 # TODO: Update downloader hints
571 return self._do_serialized(self._upload, new_contents, servermap)
574 def modify(self, modifier, backoffer=None):
576 I modify the contents of the best recoverable version of this
577 mutable file with the modifier. This is equivalent to calling
578 modify on the result of get_best_mutable_version. I return a
579 Deferred that eventually fires with an UploadResults instance
580 describing this process.
582 # TODO: Update downloader hints.
583 return self._do_serialized(self._modify, modifier, backoffer)
586 def _modify(self, modifier, backoffer):
588 I am the serialized sibling of modify.
590 d = self.get_best_mutable_version()
591 d.addCallback(lambda mfv: mfv.modify(modifier, backoffer))
595 def download_version(self, servermap, version, fetch_privkey=False):
597 Download the specified version of this mutable file. I return a
598 Deferred that fires with the contents of the specified version
599 as a bytestring, or errbacks if the file is not recoverable.
601 d = self.get_readable_version(servermap, version)
602 return d.addCallback(lambda mfv: mfv.download_to_data(fetch_privkey))
605 def get_servermap(self, mode):
607 I return a servermap that has been updated in mode.
609 mode should be one of MODE_READ, MODE_WRITE, MODE_CHECK or
610 MODE_ANYTHING. See servermap.py for more on what these mean.
612 return self._do_serialized(self._get_servermap, mode)
615 def _get_servermap(self, mode):
617 I am a serialized twin to get_servermap.
619 servermap = ServerMap()
620 d = self._update_servermap(servermap, mode)
621 # The servermap will tell us about the most recent size of the
622 # file, so we may as well set that so that callers might get
623 # more data about us.
624 if not self._most_recent_size:
625 d.addCallback(self._get_size_from_servermap)
629 def _get_size_from_servermap(self, servermap):
631 I extract the size of the best version of this file and record
632 it in self._most_recent_size. I return the servermap that I was
635 if servermap.recoverable_versions():
636 v = servermap.best_recoverable_version()
637 size = v[4] # verinfo[4] == size
638 self._most_recent_size = size
642 def _update_servermap(self, servermap, mode):
643 u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
646 self._history.notify_mapupdate(u.get_status())
650 #def set_version(self, version):
651 # I can be set in two ways:
652 # 1. When the node is created.
653 # 2. (for an existing share) when the Servermap is updated
655 # assert version in (MDMF_VERSION, SDMF_VERSION)
656 # self._protocol_version = version
659 def get_version(self):
660 return self._protocol_version
663 def _do_serialized(self, cb, *args, **kwargs):
664 # note: to avoid deadlock, this callable is *not* allowed to invoke
665 # other serialized methods within this (or any other)
666 # MutableFileNode. The callable should be a bound method of this same
669 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
670 # we need to put off d.callback until this Deferred is finished being
671 # processed. Otherwise the caller's subsequent activities (like,
672 # doing other things with this node) can cause reentrancy problems in
673 # the Deferred code itself
674 self._serializer.addBoth(lambda res: eventually(d.callback, res))
675 # add a log.err just in case something really weird happens, because
676 # self._serializer stays around forever, therefore we won't see the
677 # usual Unhandled Error in Deferred that would give us a hint.
678 self._serializer.addErrback(log.err)
682 def _upload(self, new_contents, servermap):
684 A MutableFileNode still has to have some way of getting
685 published initially, which is what I am here for. After that,
686 all publishing, updating, modifying and so on happens through
689 assert self._pubkey, "update_servermap must be called before publish"
691 # Define IPublishInvoker with a set_downloader_hints method?
692 # Then have the publisher call that method when it's done publishing?
693 p = Publish(self, self._storage_broker, servermap)
695 self._history.notify_publish(p.get_status(),
696 new_contents.get_size())
697 d = p.publish(new_contents)
698 d.addCallback(self._did_upload, new_contents.get_size())
702 def set_downloader_hints(self, hints):
703 self._downloader_hints = hints
704 extensions = hints.values()
705 self._uri.set_extension_params(extensions)
708 def _did_upload(self, res, size):
709 self._most_recent_size = size
713 class MutableFileVersion:
715 I represent a specific version (most likely the best version) of a
718 Since I implement IReadable, instances which hold a
719 reference to an instance of me are guaranteed the ability (absent
720 connection difficulties or unrecoverable versions) to read the file
721 that I represent. Depending on whether I was initialized with a
722 write capability or not, I may also provide callers the ability to
723 overwrite or modify the contents of the mutable file that I
726 implements(IMutableFileVersion, IWriteable)
740 self._servermap = servermap
741 self._version = version
742 self._storage_index = storage_index
743 self._write_secrets = write_secrets
744 self._history = history
745 self._storage_broker = storage_broker
747 #assert isinstance(readcap, IURI)
748 self._readcap = readcap
750 self._writekey = writekey
751 self._serializer = defer.succeed(None)
754 def get_sequence_number(self):
756 Get the sequence number of the mutable version that I represent.
758 return self._version[0] # verinfo[0] == the sequence number
762 def get_writekey(self):
764 I return a writekey or None if I don't have a writekey.
766 return self._writekey
769 def set_downloader_hints(self, hints):
771 I set the downloader hints.
773 assert isinstance(hints, dict)
775 self._downloader_hints = hints
778 def get_downloader_hints(self):
780 I return the downloader hints.
782 return self._downloader_hints
785 def overwrite(self, new_contents):
787 I overwrite the contents of this mutable file version with the
788 data in new_contents.
790 assert not self.is_readonly()
792 return self._do_serialized(self._overwrite, new_contents)
795 def _overwrite(self, new_contents):
796 assert IMutableUploadable.providedBy(new_contents)
797 assert self._servermap.last_update_mode == MODE_WRITE
799 return self._upload(new_contents)
802 def modify(self, modifier, backoffer=None):
803 """I use a modifier callback to apply a change to the mutable file.
804 I implement the following pseudocode::
806 obtain_mutable_filenode_lock()
809 update_servermap(MODE_WRITE)
810 old = retrieve_best_version()
811 new = modifier(old, servermap, first_time)
816 except UncoordinatedWriteError, e:
820 release_mutable_filenode_lock()
822 The idea is that your modifier function can apply a delta of some
823 sort, and it will be re-run as necessary until it succeeds. The
824 modifier must inspect the old version to see whether its delta has
825 already been applied: if so it should return the contents unmodified.
827 Note that the modifier is required to run synchronously, and must not
828 invoke any methods on this MutableFileNode instance.
830 The backoff-er is a callable that is responsible for inserting a
831 random delay between subsequent attempts, to help competing updates
832 from colliding forever. It is also allowed to give up after a while.
833 The backoffer is given two arguments: this MutableFileNode, and the
834 Failure object that contains the UncoordinatedWriteError. It should
835 return a Deferred that will fire when the next attempt should be
836 made, or return the Failure if the loop should give up. If
837 backoffer=None, a default one is provided which will perform
838 exponential backoff, and give up after 4 tries. Note that the
839 backoffer should not invoke any methods on this MutableFileNode
840 instance, and it needs to be highly conscious of deadlock issues.
842 assert not self.is_readonly()
844 return self._do_serialized(self._modify, modifier, backoffer)
847 def _modify(self, modifier, backoffer):
848 if backoffer is None:
849 backoffer = BackoffAgent().delay
850 return self._modify_and_retry(modifier, backoffer, True)
853 def _modify_and_retry(self, modifier, backoffer, first_time):
855 I try to apply modifier to the contents of this version of the
856 mutable file. If I succeed, I return an UploadResults instance
857 describing my success. If I fail, I try again after waiting for
860 log.msg("doing modify")
862 d = self._update_servermap()
864 # We ran into trouble; do MODE_CHECK so we're a little more
865 # careful on subsequent tries.
866 d = self._update_servermap(mode=MODE_CHECK)
868 d.addCallback(lambda ignored:
869 self._modify_once(modifier, first_time))
871 f.trap(UncoordinatedWriteError)
872 # Uh oh, it broke. We're allowed to trust the servermap for our
873 # first try, but after that we need to update it. It's
874 # possible that we've failed due to a race with another
875 # uploader, and if the race is to converge correctly, we
876 # need to know about that upload.
877 d2 = defer.maybeDeferred(backoffer, self, f)
878 d2.addCallback(lambda ignored:
879 self._modify_and_retry(modifier,
886 def _modify_once(self, modifier, first_time):
888 I attempt to apply a modifier to the contents of the mutable
891 assert self._servermap.last_update_mode != MODE_READ
893 # download_to_data is serialized, so we have to call this to
895 d = self._try_to_download_data()
896 def _apply(old_contents):
897 new_contents = modifier(old_contents, self._servermap, first_time)
898 precondition((isinstance(new_contents, str) or
899 new_contents is None),
900 "Modifier function must return a string "
903 if new_contents is None or new_contents == old_contents:
904 log.msg("no changes")
905 # no changes need to be made
908 # However, since Publish is not automatically doing a
909 # recovery when it observes UCWE, we need to do a second
910 # publish. See #551 for details. We'll basically loop until
911 # we managed an uncontested publish.
912 old_uploadable = MutableData(old_contents)
913 new_contents = old_uploadable
915 new_contents = MutableData(new_contents)
917 return self._upload(new_contents)
918 d.addCallback(_apply)
922 def is_readonly(self):
924 I return True if this MutableFileVersion provides no write
925 access to the file that it encapsulates, and False if it
926 provides the ability to modify the file.
928 return self._writekey is None
931 def is_mutable(self):
933 I return True, since mutable files are always mutable by
939 def get_storage_index(self):
941 I return the storage index of the reference that I encapsulate.
943 return self._storage_index
948 I return the length, in bytes, of this readable object.
950 return self._servermap.size_of_version(self._version)
953 def download_to_data(self, fetch_privkey=False):
955 I return a Deferred that fires with the contents of this
956 readable object as a byte string.
959 c = consumer.MemoryConsumer()
960 d = self.read(c, fetch_privkey=fetch_privkey)
961 d.addCallback(lambda mc: "".join(mc.chunks))
965 def _try_to_download_data(self):
967 I am an unserialized cousin of download_to_data; I am called
968 from the children of modify() to download the data associated
969 with this mutable version.
971 c = consumer.MemoryConsumer()
972 # modify will almost certainly write, so we need the privkey.
973 d = self._read(c, fetch_privkey=True)
974 d.addCallback(lambda mc: "".join(mc.chunks))
978 def read(self, consumer, offset=0, size=None, fetch_privkey=False):
980 I read a portion (possibly all) of the mutable file that I
981 reference into consumer.
983 return self._do_serialized(self._read, consumer, offset, size,
987 def _read(self, consumer, offset=0, size=None, fetch_privkey=False):
989 I am the serialized companion of read.
991 r = Retrieve(self._node, self._servermap, self._version, fetch_privkey)
993 self._history.notify_retrieve(r.get_status())
994 d = r.download(consumer, offset, size)
998 def _do_serialized(self, cb, *args, **kwargs):
999 # note: to avoid deadlock, this callable is *not* allowed to invoke
1000 # other serialized methods within this (or any other)
1001 # MutableFileNode. The callable should be a bound method of this same
1003 d = defer.Deferred()
1004 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
1005 # we need to put off d.callback until this Deferred is finished being
1006 # processed. Otherwise the caller's subsequent activities (like,
1007 # doing other things with this node) can cause reentrancy problems in
1008 # the Deferred code itself
1009 self._serializer.addBoth(lambda res: eventually(d.callback, res))
1010 # add a log.err just in case something really weird happens, because
1011 # self._serializer stays around forever, therefore we won't see the
1012 # usual Unhandled Error in Deferred that would give us a hint.
1013 self._serializer.addErrback(log.err)
1017 def _upload(self, new_contents):
1018 #assert self._pubkey, "update_servermap must be called before publish"
1019 p = Publish(self._node, self._storage_broker, self._servermap)
1021 self._history.notify_publish(p.get_status(),
1022 new_contents.get_size())
1023 d = p.publish(new_contents)
1024 d.addCallback(self._did_upload, new_contents.get_size())
1028 def _did_upload(self, res, size):
1029 self._most_recent_size = size
1032 def update(self, data, offset):
1034 Do an update of this mutable file version by inserting data at
1035 offset within the file. If offset is the EOF, this is an append
1036 operation. I return a Deferred that fires with the results of
1037 the update operation when it has completed.
1039 In cases where update does not append any data, or where it does
1040 not append so many blocks that the block count crosses a
1041 power-of-two boundary, this operation will use roughly
1042 O(data.get_size()) memory/bandwidth/CPU to perform the update.
1043 Otherwise, it must download, re-encode, and upload the entire
1044 file again, which will use O(filesize) resources.
1046 return self._do_serialized(self._update, data, offset)
1049 def _update(self, data, offset):
1051 I update the mutable file version represented by this particular
1052 IMutableVersion by inserting the data in data at the offset
1053 offset. I return a Deferred that fires when this has been
1056 new_size = data.get_size() + offset
1057 old_size = self.get_size()
1058 segment_size = self._version[3]
1059 num_old_segments = mathutil.div_ceil(old_size,
1061 num_new_segments = mathutil.div_ceil(new_size,
1063 log.msg("got %d old segments, %d new segments" % \
1064 (num_old_segments, num_new_segments))
1066 # We do a whole file re-encode if the file is an SDMF file.
1067 if self._version[2]: # version[2] == SDMF salt, which MDMF lacks
1068 log.msg("doing re-encode instead of in-place update")
1069 return self._do_modify_update(data, offset)
1071 # Otherwise, we can replace just the parts that are changing.
1072 log.msg("updating in place")
1073 d = self._do_update_update(data, offset)
1074 d.addCallback(self._decode_and_decrypt_segments, data, offset)
1075 d.addCallback(self._build_uploadable_and_finish, data, offset)
1079 def _do_modify_update(self, data, offset):
1081 I perform a file update by modifying the contents of the file
1082 after downloading it, then reuploading it. I am less efficient
1083 than _do_update_update, but am necessary for certain updates.
1085 def m(old, servermap, first_time):
1087 rest = offset + data.get_size()
1089 new += "".join(data.read(data.get_size()))
1092 return self._modify(m, None)
1095 def _do_update_update(self, data, offset):
1097 I start the Servermap update that gets us the data we need to
1098 continue the update process. I return a Deferred that fires when
1099 the servermap update is done.
1101 assert IMutableUploadable.providedBy(data)
1102 assert self.is_mutable()
1103 # offset == self.get_size() is valid and means that we are
1104 # appending data to the file.
1105 assert offset <= self.get_size()
1107 segsize = self._version[3]
1108 # We'll need the segment that the data starts in, regardless of
1109 # what we'll do later.
1110 start_segment = offset // segsize
1112 # We only need the end segment if the data we append does not go
1113 # beyond the current end-of-file.
1114 end_segment = start_segment
1115 if offset + data.get_size() < self.get_size():
1116 end_data = offset + data.get_size()
1117 # The last byte we touch is the end_data'th byte, which is actually
1118 # byte end_data - 1 because bytes are zero-indexed.
1120 end_segment = end_data // segsize
1122 self._start_segment = start_segment
1123 self._end_segment = end_segment
1125 # Now ask for the servermap to be updated in MODE_WRITE with
1126 # this update range.
1127 return self._update_servermap(update_range=(start_segment,
1131 def _decode_and_decrypt_segments(self, ignored, data, offset):
1133 After the servermap update, I take the encrypted and encoded
1134 data that the servermap fetched while doing its update and
1135 transform it into decoded-and-decrypted plaintext that can be
1136 used by the new uploadable. I return a Deferred that fires with
1139 r = Retrieve(self._node, self._servermap, self._version)
1140 # decode: takes in our blocks and salts from the servermap,
1141 # returns a Deferred that fires with the corresponding plaintext
1142 # segments. Does not download -- simply takes advantage of
1143 # existing infrastructure within the Retrieve class to avoid
1145 sm = self._servermap
1146 # XXX: If the methods in the servermap don't work as
1147 # abstractions, you should rewrite them instead of going around
1149 update_data = sm.update_data
1150 start_segments = {} # shnum -> start segment
1151 end_segments = {} # shnum -> end segment
1152 blockhashes = {} # shnum -> blockhash tree
1153 for (shnum, data) in update_data.iteritems():
1154 data = [d[1] for d in data if d[0] == self._version]
1156 # Every data entry in our list should now be share shnum for
1157 # a particular version of the mutable file, so all of the
1158 # entries should be identical.
1160 assert filter(lambda x: x != datum, data) == []
1162 blockhashes[shnum] = datum[0]
1163 start_segments[shnum] = datum[1]
1164 end_segments[shnum] = datum[2]
1166 d1 = r.decode(start_segments, self._start_segment)
1167 d2 = r.decode(end_segments, self._end_segment)
1168 d3 = defer.succeed(blockhashes)
1169 return deferredutil.gatherResults([d1, d2, d3])
1172 def _build_uploadable_and_finish(self, segments_and_bht, data, offset):
1174 After the process has the plaintext segments, I build the
1175 TransformingUploadable that the publisher will eventually
1176 re-upload to the grid. I then invoke the publisher with that
1177 uploadable, and return a Deferred when the publish operation has
1178 completed without issue.
1180 u = TransformingUploadable(data, offset,
1182 segments_and_bht[0],
1183 segments_and_bht[1])
1184 p = Publish(self._node, self._storage_broker, self._servermap)
1185 return p.update(u, offset, segments_and_bht[2], self._version)
1188 def _update_servermap(self, mode=MODE_WRITE, update_range=None):
1190 I update the servermap. I return a Deferred that fires when the
1191 servermap update is done.
1194 u = ServermapUpdater(self._node, self._storage_broker, Monitor(),
1197 update_range=update_range)
1199 u = ServermapUpdater(self._node, self._storage_broker, Monitor(),