4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, ICheckable, ICheckResults, \
8 NotEnoughSharesError, MDMF_VERSION, SDMF_VERSION, IMutableUploadable, \
9 IMutableFileVersion, IWriteable
10 from allmydata.util import hashutil, log, consumer, deferredutil, mathutil
11 from allmydata.util.assertutil import precondition
12 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI, \
13 WriteableMDMFFileURI, ReadonlyMDMFFileURI
14 from allmydata.monitor import Monitor
15 from pycryptopp.cipher.aes import AES
17 from allmydata.mutable.publish import Publish, MutableData,\
18 TransformingUploadable
19 from allmydata.mutable.common import MODE_READ, MODE_WRITE, MODE_CHECK, UnrecoverableFileError, \
20 ResponseCache, UncoordinatedWriteError
21 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
22 from allmydata.mutable.retrieve import Retrieve
23 from allmydata.mutable.checker import MutableChecker, MutableCheckAndRepairer
24 from allmydata.mutable.repairer import Repairer
28 # these parameters are copied from foolscap.reconnector, which gets them
29 # from twisted.internet.protocol.ReconnectingClientFactory
31 factor = 2.7182818284590451 # (math.e)
32 jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
36 self._delay = self.initialDelay
38 def delay(self, node, f):
42 self._delay = self._delay * self.factor
43 self._delay = random.normalvariate(self._delay,
44 self._delay * self.jitter)
46 reactor.callLater(self._delay, d.callback, None)
49 # use nodemaker.create_mutable_file() to make one of these
51 class MutableFileNode:
52 implements(IMutableFileNode, ICheckable)
54 def __init__(self, storage_broker, secret_holder,
55 default_encoding_parameters, history):
56 self._storage_broker = storage_broker
57 self._secret_holder = secret_holder
58 self._default_encoding_parameters = default_encoding_parameters
59 self._history = history
60 self._pubkey = None # filled in upon first read
61 self._privkey = None # filled in if we're mutable
62 # we keep track of the last encoding parameters that we use. These
63 # are updated upon retrieve, and used by publish. If we publish
64 # without ever reading (i.e. overwrite()), then we use these values.
65 self._required_shares = default_encoding_parameters["k"]
66 self._total_shares = default_encoding_parameters["n"]
67 self._sharemap = {} # known shares, shnum-to-[nodeids]
68 self._cache = ResponseCache()
69 self._most_recent_size = None
70 # filled in after __init__ if we're being created for the first time;
71 # filled in by the servermap updater before publishing, otherwise.
72 # set to this default value in case neither of those things happen,
73 # or in case the servermap can't find any shares to tell us what
75 self._protocol_version = None
77 # all users of this MutableFileNode go through the serializer. This
78 # takes advantage of the fact that Deferreds discard the callbacks
79 # that they're done with, so we can keep using the same Deferred
80 # forever without consuming more and more memory.
81 self._serializer = defer.succeed(None)
83 # Starting with MDMF, we can get these from caps if they're
84 # there. Leave them alone for now; they'll be filled in by my
85 # init_from_cap method if necessary.
86 self._downloader_hints = {}
89 if hasattr(self, '_uri'):
90 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
92 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
94 def init_from_cap(self, filecap):
95 # we have the URI, but we have not yet retrieved the public
96 # verification key, nor things like 'k' or 'N'. If and when someone
97 # wants to get our contents, we'll pull from shares and fill those
99 if isinstance(filecap, (WriteableMDMFFileURI, ReadonlyMDMFFileURI)):
100 self._protocol_version = MDMF_VERSION
101 elif isinstance(filecap, (ReadonlySSKFileURI, WriteableSSKFileURI)):
102 self._protocol_version = SDMF_VERSION
105 self._writekey = None
107 if not filecap.is_readonly() and filecap.is_mutable():
108 self._writekey = self._uri.writekey
109 self._readkey = self._uri.readkey
110 self._storage_index = self._uri.storage_index
111 self._fingerprint = self._uri.fingerprint
112 # the following values are learned during Retrieval
114 # self._required_shares
116 # and these are needed for Publish. They are filled in by Retrieval
117 # if possible, otherwise by the first peer that Publish talks to.
119 self._encprivkey = None
123 def create_with_keys(self, (pubkey, privkey), contents,
124 version=SDMF_VERSION):
125 """Call this to create a brand-new mutable file. It will create the
126 shares, find homes for them, and upload the initial contents (created
127 with the same rules as IClient.create_mutable_file() ). Returns a
128 Deferred that fires (with the MutableFileNode instance you should
129 use) when it completes.
131 self._pubkey, self._privkey = pubkey, privkey
132 pubkey_s = self._pubkey.serialize()
133 privkey_s = self._privkey.serialize()
134 self._writekey = hashutil.ssk_writekey_hash(privkey_s)
135 self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
136 self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
137 if version == MDMF_VERSION:
138 self._uri = WriteableMDMFFileURI(self._writekey, self._fingerprint)
139 self._protocol_version = version
140 elif version == SDMF_VERSION:
141 self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
142 self._protocol_version = version
143 self._readkey = self._uri.readkey
144 self._storage_index = self._uri.storage_index
145 initial_contents = self._get_initial_contents(contents)
146 return self._upload(initial_contents, None)
148 def _get_initial_contents(self, contents):
150 return MutableData("")
152 if isinstance(contents, str):
153 return MutableData(contents)
155 if IMutableUploadable.providedBy(contents):
158 assert callable(contents), "%s should be callable, not %s" % \
159 (contents, type(contents))
160 return contents(self)
162 def _encrypt_privkey(self, writekey, privkey):
164 crypttext = enc.process(privkey)
167 def _decrypt_privkey(self, enc_privkey):
168 enc = AES(self._writekey)
169 privkey = enc.process(enc_privkey)
172 def _populate_pubkey(self, pubkey):
173 self._pubkey = pubkey
174 def _populate_required_shares(self, required_shares):
175 self._required_shares = required_shares
176 def _populate_total_shares(self, total_shares):
177 self._total_shares = total_shares
179 def _populate_privkey(self, privkey):
180 self._privkey = privkey
181 def _populate_encprivkey(self, encprivkey):
182 self._encprivkey = encprivkey
183 def _add_to_cache(self, verinfo, shnum, offset, data):
184 self._cache.add(verinfo, shnum, offset, data)
185 def _read_from_cache(self, verinfo, shnum, offset, length):
186 return self._cache.read(verinfo, shnum, offset, length)
188 def get_write_enabler(self, server):
189 seed = server.get_foolscap_write_enabler_seed()
190 assert len(seed) == 20
191 return hashutil.ssk_write_enabler_hash(self._writekey, seed)
192 def get_renewal_secret(self, server):
193 crs = self._secret_holder.get_renewal_secret()
194 frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
195 lease_seed = server.get_lease_seed()
196 assert len(lease_seed) == 20
197 return hashutil.bucket_renewal_secret_hash(frs, lease_seed)
198 def get_cancel_secret(self, server):
199 ccs = self._secret_holder.get_cancel_secret()
200 fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
201 lease_seed = server.get_lease_seed()
202 assert len(lease_seed) == 20
203 return hashutil.bucket_cancel_secret_hash(fcs, lease_seed)
205 def get_writekey(self):
206 return self._writekey
207 def get_readkey(self):
209 def get_storage_index(self):
210 return self._storage_index
211 def get_fingerprint(self):
212 return self._fingerprint
213 def get_privkey(self):
215 def get_encprivkey(self):
216 return self._encprivkey
217 def get_pubkey(self):
220 def get_required_shares(self):
221 return self._required_shares
222 def get_total_shares(self):
223 return self._total_shares
225 ####################################
229 return self._most_recent_size
231 def get_current_size(self):
232 d = self.get_size_of_best_version()
233 d.addCallback(self._stash_size)
236 def _stash_size(self, size):
237 self._most_recent_size = size
242 def get_readcap(self):
243 return self._uri.get_readonly()
244 def get_verify_cap(self):
245 return self._uri.get_verify_cap()
246 def get_repair_cap(self):
247 if self._uri.is_readonly():
252 return self._uri.to_string()
254 def get_write_uri(self):
255 if self.is_readonly():
257 return self._uri.to_string()
259 def get_readonly_uri(self):
260 return self._uri.get_readonly().to_string()
262 def get_readonly(self):
263 if self.is_readonly():
265 ro = MutableFileNode(self._storage_broker, self._secret_holder,
266 self._default_encoding_parameters, self._history)
267 ro.init_from_cap(self._uri.get_readonly())
270 def is_mutable(self):
271 return self._uri.is_mutable()
273 def is_readonly(self):
274 return self._uri.is_readonly()
276 def is_unknown(self):
279 def is_allowed_in_immutable_directory(self):
280 return not self._uri.is_mutable()
282 def raise_error(self):
286 return hash((self.__class__, self._uri))
287 def __cmp__(self, them):
288 if cmp(type(self), type(them)):
289 return cmp(type(self), type(them))
290 if cmp(self.__class__, them.__class__):
291 return cmp(self.__class__, them.__class__)
292 return cmp(self._uri, them._uri)
295 #################################
298 def check(self, monitor, verify=False, add_lease=False):
299 checker = MutableChecker(self, self._storage_broker,
300 self._history, monitor)
301 return checker.check(verify, add_lease)
303 def check_and_repair(self, monitor, verify=False, add_lease=False):
304 checker = MutableCheckAndRepairer(self, self._storage_broker,
305 self._history, monitor)
306 return checker.check(verify, add_lease)
308 #################################
311 def repair(self, check_results, force=False):
312 assert ICheckResults(check_results)
313 r = Repairer(self, check_results)
318 #################################
321 def get_best_readable_version(self):
323 I return a Deferred that fires with a MutableFileVersion
324 representing the best readable version of the file that I
327 return self.get_readable_version()
330 def get_readable_version(self, servermap=None, version=None):
332 I return a Deferred that fires with an MutableFileVersion for my
333 version argument, if there is a recoverable file of that version
334 on the grid. If there is no recoverable version, I fire with an
335 UnrecoverableFileError.
337 If a servermap is provided, I look in there for the requested
338 version. If no servermap is provided, I create and update a new
341 If no version is provided, then I return a MutableFileVersion
342 representing the best recoverable version of the file.
344 d = self._get_version_from_servermap(MODE_READ, servermap, version)
345 def _build_version((servermap, their_version)):
346 assert their_version in servermap.recoverable_versions()
347 assert their_version in servermap.make_versionmap()
349 mfv = MutableFileVersion(self,
353 self._storage_broker,
355 history=self._history)
356 assert mfv.is_readonly()
357 mfv.set_downloader_hints(self._downloader_hints)
358 # our caller can use this to download the contents of the
361 return d.addCallback(_build_version)
364 def _get_version_from_servermap(self,
369 I return a Deferred that fires with (servermap, version).
371 This function performs validation and a servermap update. If it
372 returns (servermap, version), the caller can assume that:
373 - servermap was last updated in mode.
374 - version is recoverable, and corresponds to the servermap.
376 If version and servermap are provided to me, I will validate
377 that version exists in the servermap, and that the servermap was
380 If version is not provided, but servermap is, I will validate
381 the servermap and return the best recoverable version that I can
382 find in the servermap.
384 If the version is provided but the servermap isn't, I will
385 obtain a servermap that has been updated in the correct mode and
386 validate that version is found and recoverable.
388 If neither servermap nor version are provided, I will obtain a
389 servermap updated in the correct mode, and return the best
390 recoverable version that I can find in there.
393 if servermap and servermap.get_last_update()[0] == mode:
394 d = defer.succeed(servermap)
396 d = self._get_servermap(mode)
398 def _get_version(servermap, v):
399 if v and v not in servermap.recoverable_versions():
402 v = servermap.best_recoverable_version()
404 raise UnrecoverableFileError("no recoverable versions")
406 return (servermap, v)
407 return d.addCallback(_get_version, version)
410 def download_best_version(self):
412 I return a Deferred that fires with the contents of the best
413 version of this mutable file.
415 return self._do_serialized(self._download_best_version)
418 def _download_best_version(self):
420 I am the serialized sibling of download_best_version.
422 d = self.get_best_readable_version()
423 d.addCallback(self._record_size)
424 d.addCallback(lambda version: version.download_to_data())
426 # It is possible that the download will fail because there
427 # aren't enough shares to be had. If so, we will try again after
428 # updating the servermap in MODE_WRITE, which may find more
429 # shares than updating in MODE_READ, as we just did. We can do
430 # this by getting the best mutable version and downloading from
431 # that -- the best mutable version will be a MutableFileVersion
432 # with a servermap that was last updated in MODE_WRITE, as we
433 # want. If this fails, then we give up.
434 def _maybe_retry(failure):
435 failure.trap(NotEnoughSharesError)
437 d = self.get_best_mutable_version()
438 d.addCallback(self._record_size)
439 d.addCallback(lambda version: version.download_to_data())
442 d.addErrback(_maybe_retry)
446 def _record_size(self, mfv):
448 I record the size of a mutable file version.
450 self._most_recent_size = mfv.get_size()
454 def get_size_of_best_version(self):
456 I return the size of the best version of this mutable file.
458 This is equivalent to calling get_size() on the result of
459 get_best_readable_version().
461 d = self.get_best_readable_version()
462 return d.addCallback(lambda mfv: mfv.get_size())
465 #################################
468 def get_best_mutable_version(self, servermap=None):
470 I return a Deferred that fires with a MutableFileVersion
471 representing the best readable version of the file that I
472 represent. I am like get_best_readable_version, except that I
473 will try to make a writeable version if I can.
475 return self.get_mutable_version(servermap=servermap)
478 def get_mutable_version(self, servermap=None, version=None):
480 I return a version of this mutable file. I return a Deferred
481 that fires with a MutableFileVersion
483 If version is provided, the Deferred will fire with a
484 MutableFileVersion initailized with that version. Otherwise, it
485 will fire with the best version that I can recover.
487 If servermap is provided, I will use that to find versions
488 instead of performing my own servermap update.
490 if self.is_readonly():
491 return self.get_readable_version(servermap=servermap,
494 # get_mutable_version => write intent, so we require that the
495 # servermap is updated in MODE_WRITE
496 d = self._get_version_from_servermap(MODE_WRITE, servermap, version)
497 def _build_version((servermap, smap_version)):
498 # these should have been set by the servermap update.
499 assert self._secret_holder
500 assert self._writekey
502 mfv = MutableFileVersion(self,
506 self._storage_broker,
510 history=self._history)
511 assert not mfv.is_readonly()
512 mfv.set_downloader_hints(self._downloader_hints)
515 return d.addCallback(_build_version)
518 # XXX: I'm uncomfortable with the difference between upload and
519 # overwrite, which, FWICT, is basically that you don't have to
520 # do a servermap update before you overwrite. We split them up
521 # that way anyway, so I guess there's no real difficulty in
522 # offering both ways to callers, but it also makes the
523 # public-facing API cluttery, and makes it hard to discern the
524 # right way of doing things.
526 # In general, we leave it to callers to ensure that they aren't
527 # going to cause UncoordinatedWriteErrors when working with
528 # MutableFileVersions. We know that the next three operations
529 # (upload, overwrite, and modify) will all operate on the same
530 # version, so we say that only one of them can be going on at once,
531 # and serialize them to ensure that that actually happens, since as
532 # the caller in this situation it is our job to do that.
533 def overwrite(self, new_contents):
535 I overwrite the contents of the best recoverable version of this
536 mutable file with new_contents. This is equivalent to calling
537 overwrite on the result of get_best_mutable_version with
538 new_contents as an argument. I return a Deferred that eventually
539 fires with the results of my replacement process.
541 # TODO: Update downloader hints.
542 return self._do_serialized(self._overwrite, new_contents)
545 def _overwrite(self, new_contents):
547 I am the serialized sibling of overwrite.
549 d = self.get_best_mutable_version()
550 d.addCallback(lambda mfv: mfv.overwrite(new_contents))
551 d.addCallback(self._did_upload, new_contents.get_size())
555 def upload(self, new_contents, servermap):
557 I overwrite the contents of the best recoverable version of this
558 mutable file with new_contents, using servermap instead of
559 creating/updating our own servermap. I return a Deferred that
560 fires with the results of my upload.
562 # TODO: Update downloader hints
563 return self._do_serialized(self._upload, new_contents, servermap)
566 def modify(self, modifier, backoffer=None):
568 I modify the contents of the best recoverable version of this
569 mutable file with the modifier. This is equivalent to calling
570 modify on the result of get_best_mutable_version. I return a
571 Deferred that eventually fires with an UploadResults instance
572 describing this process.
574 # TODO: Update downloader hints.
575 return self._do_serialized(self._modify, modifier, backoffer)
578 def _modify(self, modifier, backoffer):
580 I am the serialized sibling of modify.
582 d = self.get_best_mutable_version()
583 d.addCallback(lambda mfv: mfv.modify(modifier, backoffer))
587 def download_version(self, servermap, version, fetch_privkey=False):
589 Download the specified version of this mutable file. I return a
590 Deferred that fires with the contents of the specified version
591 as a bytestring, or errbacks if the file is not recoverable.
593 d = self.get_readable_version(servermap, version)
594 return d.addCallback(lambda mfv: mfv.download_to_data(fetch_privkey))
597 def get_servermap(self, mode):
599 I return a servermap that has been updated in mode.
601 mode should be one of MODE_READ, MODE_WRITE, MODE_CHECK or
602 MODE_ANYTHING. See servermap.py for more on what these mean.
604 return self._do_serialized(self._get_servermap, mode)
607 def _get_servermap(self, mode):
609 I am a serialized twin to get_servermap.
611 servermap = ServerMap()
612 d = self._update_servermap(servermap, mode)
613 # The servermap will tell us about the most recent size of the
614 # file, so we may as well set that so that callers might get
615 # more data about us.
616 if not self._most_recent_size:
617 d.addCallback(self._get_size_from_servermap)
621 def _get_size_from_servermap(self, servermap):
623 I extract the size of the best version of this file and record
624 it in self._most_recent_size. I return the servermap that I was
627 if servermap.recoverable_versions():
628 v = servermap.best_recoverable_version()
629 size = v[4] # verinfo[4] == size
630 self._most_recent_size = size
634 def _update_servermap(self, servermap, mode):
635 u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
638 self._history.notify_mapupdate(u.get_status())
642 #def set_version(self, version):
643 # I can be set in two ways:
644 # 1. When the node is created.
645 # 2. (for an existing share) when the Servermap is updated
647 # assert version in (MDMF_VERSION, SDMF_VERSION)
648 # self._protocol_version = version
651 def get_version(self):
652 return self._protocol_version
655 def _do_serialized(self, cb, *args, **kwargs):
656 # note: to avoid deadlock, this callable is *not* allowed to invoke
657 # other serialized methods within this (or any other)
658 # MutableFileNode. The callable should be a bound method of this same
661 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
662 # we need to put off d.callback until this Deferred is finished being
663 # processed. Otherwise the caller's subsequent activities (like,
664 # doing other things with this node) can cause reentrancy problems in
665 # the Deferred code itself
666 self._serializer.addBoth(lambda res: eventually(d.callback, res))
667 # add a log.err just in case something really weird happens, because
668 # self._serializer stays around forever, therefore we won't see the
669 # usual Unhandled Error in Deferred that would give us a hint.
670 self._serializer.addErrback(log.err)
674 def _upload(self, new_contents, servermap):
676 A MutableFileNode still has to have some way of getting
677 published initially, which is what I am here for. After that,
678 all publishing, updating, modifying and so on happens through
681 assert self._pubkey, "update_servermap must be called before publish"
683 # Define IPublishInvoker with a set_downloader_hints method?
684 # Then have the publisher call that method when it's done publishing?
685 p = Publish(self, self._storage_broker, servermap)
687 self._history.notify_publish(p.get_status(),
688 new_contents.get_size())
689 d = p.publish(new_contents)
690 d.addCallback(self._did_upload, new_contents.get_size())
694 def set_downloader_hints(self, hints):
695 self._downloader_hints = hints
697 def _did_upload(self, res, size):
698 self._most_recent_size = size
702 class MutableFileVersion:
704 I represent a specific version (most likely the best version) of a
707 Since I implement IReadable, instances which hold a
708 reference to an instance of me are guaranteed the ability (absent
709 connection difficulties or unrecoverable versions) to read the file
710 that I represent. Depending on whether I was initialized with a
711 write capability or not, I may also provide callers the ability to
712 overwrite or modify the contents of the mutable file that I
715 implements(IMutableFileVersion, IWriteable)
729 self._servermap = servermap
730 self._version = version
731 self._storage_index = storage_index
732 self._write_secrets = write_secrets
733 self._history = history
734 self._storage_broker = storage_broker
736 #assert isinstance(readcap, IURI)
737 self._readcap = readcap
739 self._writekey = writekey
740 self._serializer = defer.succeed(None)
743 def get_sequence_number(self):
745 Get the sequence number of the mutable version that I represent.
747 return self._version[0] # verinfo[0] == the sequence number
751 def get_writekey(self):
753 I return a writekey or None if I don't have a writekey.
755 return self._writekey
758 def set_downloader_hints(self, hints):
760 I set the downloader hints.
762 assert isinstance(hints, dict)
764 self._downloader_hints = hints
767 def get_downloader_hints(self):
769 I return the downloader hints.
771 return self._downloader_hints
774 def overwrite(self, new_contents):
776 I overwrite the contents of this mutable file version with the
777 data in new_contents.
779 assert not self.is_readonly()
781 return self._do_serialized(self._overwrite, new_contents)
784 def _overwrite(self, new_contents):
785 assert IMutableUploadable.providedBy(new_contents)
786 assert self._servermap.get_last_update()[0] == MODE_WRITE
788 return self._upload(new_contents)
791 def modify(self, modifier, backoffer=None):
792 """I use a modifier callback to apply a change to the mutable file.
793 I implement the following pseudocode::
795 obtain_mutable_filenode_lock()
798 update_servermap(MODE_WRITE)
799 old = retrieve_best_version()
800 new = modifier(old, servermap, first_time)
805 except UncoordinatedWriteError, e:
809 release_mutable_filenode_lock()
811 The idea is that your modifier function can apply a delta of some
812 sort, and it will be re-run as necessary until it succeeds. The
813 modifier must inspect the old version to see whether its delta has
814 already been applied: if so it should return the contents unmodified.
816 Note that the modifier is required to run synchronously, and must not
817 invoke any methods on this MutableFileNode instance.
819 The backoff-er is a callable that is responsible for inserting a
820 random delay between subsequent attempts, to help competing updates
821 from colliding forever. It is also allowed to give up after a while.
822 The backoffer is given two arguments: this MutableFileNode, and the
823 Failure object that contains the UncoordinatedWriteError. It should
824 return a Deferred that will fire when the next attempt should be
825 made, or return the Failure if the loop should give up. If
826 backoffer=None, a default one is provided which will perform
827 exponential backoff, and give up after 4 tries. Note that the
828 backoffer should not invoke any methods on this MutableFileNode
829 instance, and it needs to be highly conscious of deadlock issues.
831 assert not self.is_readonly()
833 return self._do_serialized(self._modify, modifier, backoffer)
836 def _modify(self, modifier, backoffer):
837 if backoffer is None:
838 backoffer = BackoffAgent().delay
839 return self._modify_and_retry(modifier, backoffer, True)
842 def _modify_and_retry(self, modifier, backoffer, first_time):
844 I try to apply modifier to the contents of this version of the
845 mutable file. If I succeed, I return an UploadResults instance
846 describing my success. If I fail, I try again after waiting for
849 log.msg("doing modify")
851 d = self._update_servermap()
853 # We ran into trouble; do MODE_CHECK so we're a little more
854 # careful on subsequent tries.
855 d = self._update_servermap(mode=MODE_CHECK)
857 d.addCallback(lambda ignored:
858 self._modify_once(modifier, first_time))
860 f.trap(UncoordinatedWriteError)
861 # Uh oh, it broke. We're allowed to trust the servermap for our
862 # first try, but after that we need to update it. It's
863 # possible that we've failed due to a race with another
864 # uploader, and if the race is to converge correctly, we
865 # need to know about that upload.
866 d2 = defer.maybeDeferred(backoffer, self, f)
867 d2.addCallback(lambda ignored:
868 self._modify_and_retry(modifier,
875 def _modify_once(self, modifier, first_time):
877 I attempt to apply a modifier to the contents of the mutable
880 assert self._servermap.get_last_update()[0] != MODE_READ
882 # download_to_data is serialized, so we have to call this to
884 d = self._try_to_download_data()
885 def _apply(old_contents):
886 new_contents = modifier(old_contents, self._servermap, first_time)
887 precondition((isinstance(new_contents, str) or
888 new_contents is None),
889 "Modifier function must return a string "
892 if new_contents is None or new_contents == old_contents:
893 log.msg("no changes")
894 # no changes need to be made
897 # However, since Publish is not automatically doing a
898 # recovery when it observes UCWE, we need to do a second
899 # publish. See #551 for details. We'll basically loop until
900 # we managed an uncontested publish.
901 old_uploadable = MutableData(old_contents)
902 new_contents = old_uploadable
904 new_contents = MutableData(new_contents)
906 return self._upload(new_contents)
907 d.addCallback(_apply)
911 def is_readonly(self):
913 I return True if this MutableFileVersion provides no write
914 access to the file that it encapsulates, and False if it
915 provides the ability to modify the file.
917 return self._writekey is None
920 def is_mutable(self):
922 I return True, since mutable files are always mutable by
928 def get_storage_index(self):
930 I return the storage index of the reference that I encapsulate.
932 return self._storage_index
937 I return the length, in bytes, of this readable object.
939 return self._servermap.size_of_version(self._version)
942 def download_to_data(self, fetch_privkey=False):
944 I return a Deferred that fires with the contents of this
945 readable object as a byte string.
948 c = consumer.MemoryConsumer()
949 d = self.read(c, fetch_privkey=fetch_privkey)
950 d.addCallback(lambda mc: "".join(mc.chunks))
954 def _try_to_download_data(self):
956 I am an unserialized cousin of download_to_data; I am called
957 from the children of modify() to download the data associated
958 with this mutable version.
960 c = consumer.MemoryConsumer()
961 # modify will almost certainly write, so we need the privkey.
962 d = self._read(c, fetch_privkey=True)
963 d.addCallback(lambda mc: "".join(mc.chunks))
967 def read(self, consumer, offset=0, size=None, fetch_privkey=False):
969 I read a portion (possibly all) of the mutable file that I
970 reference into consumer.
972 return self._do_serialized(self._read, consumer, offset, size,
976 def _read(self, consumer, offset=0, size=None, fetch_privkey=False):
978 I am the serialized companion of read.
980 r = Retrieve(self._node, self._storage_broker, self._servermap,
981 self._version, fetch_privkey)
983 self._history.notify_retrieve(r.get_status())
984 d = r.download(consumer, offset, size)
988 def _do_serialized(self, cb, *args, **kwargs):
989 # note: to avoid deadlock, this callable is *not* allowed to invoke
990 # other serialized methods within this (or any other)
991 # MutableFileNode. The callable should be a bound method of this same
994 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
995 # we need to put off d.callback until this Deferred is finished being
996 # processed. Otherwise the caller's subsequent activities (like,
997 # doing other things with this node) can cause reentrancy problems in
998 # the Deferred code itself
999 self._serializer.addBoth(lambda res: eventually(d.callback, res))
1000 # add a log.err just in case something really weird happens, because
1001 # self._serializer stays around forever, therefore we won't see the
1002 # usual Unhandled Error in Deferred that would give us a hint.
1003 self._serializer.addErrback(log.err)
1007 def _upload(self, new_contents):
1008 #assert self._pubkey, "update_servermap must be called before publish"
1009 p = Publish(self._node, self._storage_broker, self._servermap)
1011 self._history.notify_publish(p.get_status(),
1012 new_contents.get_size())
1013 d = p.publish(new_contents)
1014 d.addCallback(self._did_upload, new_contents.get_size())
1018 def _did_upload(self, res, size):
1019 self._most_recent_size = size
1022 def update(self, data, offset):
1024 Do an update of this mutable file version by inserting data at
1025 offset within the file. If offset is the EOF, this is an append
1026 operation. I return a Deferred that fires with the results of
1027 the update operation when it has completed.
1029 In cases where update does not append any data, or where it does
1030 not append so many blocks that the block count crosses a
1031 power-of-two boundary, this operation will use roughly
1032 O(data.get_size()) memory/bandwidth/CPU to perform the update.
1033 Otherwise, it must download, re-encode, and upload the entire
1034 file again, which will use O(filesize) resources.
1036 return self._do_serialized(self._update, data, offset)
1039 def _update(self, data, offset):
1041 I update the mutable file version represented by this particular
1042 IMutableVersion by inserting the data in data at the offset
1043 offset. I return a Deferred that fires when this has been
1046 new_size = data.get_size() + offset
1047 old_size = self.get_size()
1048 segment_size = self._version[3]
1049 num_old_segments = mathutil.div_ceil(old_size,
1051 num_new_segments = mathutil.div_ceil(new_size,
1053 log.msg("got %d old segments, %d new segments" % \
1054 (num_old_segments, num_new_segments))
1056 # We do a whole file re-encode if the file is an SDMF file.
1057 if self._version[2]: # version[2] == SDMF salt, which MDMF lacks
1058 log.msg("doing re-encode instead of in-place update")
1059 return self._do_modify_update(data, offset)
1061 # Otherwise, we can replace just the parts that are changing.
1062 log.msg("updating in place")
1063 d = self._do_update_update(data, offset)
1064 d.addCallback(self._decode_and_decrypt_segments, data, offset)
1065 d.addCallback(self._build_uploadable_and_finish, data, offset)
1069 def _do_modify_update(self, data, offset):
1071 I perform a file update by modifying the contents of the file
1072 after downloading it, then reuploading it. I am less efficient
1073 than _do_update_update, but am necessary for certain updates.
1075 def m(old, servermap, first_time):
1077 rest = offset + data.get_size()
1079 new += "".join(data.read(data.get_size()))
1082 return self._modify(m, None)
1085 def _do_update_update(self, data, offset):
1087 I start the Servermap update that gets us the data we need to
1088 continue the update process. I return a Deferred that fires when
1089 the servermap update is done.
1091 assert IMutableUploadable.providedBy(data)
1092 assert self.is_mutable()
1093 # offset == self.get_size() is valid and means that we are
1094 # appending data to the file.
1095 assert offset <= self.get_size()
1097 segsize = self._version[3]
1098 # We'll need the segment that the data starts in, regardless of
1099 # what we'll do later.
1100 start_segment = offset // segsize
1102 # We only need the end segment if the data we append does not go
1103 # beyond the current end-of-file.
1104 end_segment = start_segment
1105 if offset + data.get_size() < self.get_size():
1106 end_data = offset + data.get_size()
1107 # The last byte we touch is the end_data'th byte, which is actually
1108 # byte end_data - 1 because bytes are zero-indexed.
1110 end_segment = end_data // segsize
1112 self._start_segment = start_segment
1113 self._end_segment = end_segment
1115 # Now ask for the servermap to be updated in MODE_WRITE with
1116 # this update range.
1117 return self._update_servermap(update_range=(start_segment,
1121 def _decode_and_decrypt_segments(self, ignored, data, offset):
1123 After the servermap update, I take the encrypted and encoded
1124 data that the servermap fetched while doing its update and
1125 transform it into decoded-and-decrypted plaintext that can be
1126 used by the new uploadable. I return a Deferred that fires with
1129 r = Retrieve(self._node, self._storage_broker, self._servermap,
1131 # decode: takes in our blocks and salts from the servermap,
1132 # returns a Deferred that fires with the corresponding plaintext
1133 # segments. Does not download -- simply takes advantage of
1134 # existing infrastructure within the Retrieve class to avoid
1136 sm = self._servermap
1137 # XXX: If the methods in the servermap don't work as
1138 # abstractions, you should rewrite them instead of going around
1140 update_data = sm.update_data
1141 start_segments = {} # shnum -> start segment
1142 end_segments = {} # shnum -> end segment
1143 blockhashes = {} # shnum -> blockhash tree
1144 for (shnum, original_data) in update_data.iteritems():
1145 data = [d[1] for d in original_data if d[0] == self._version]
1147 # Every data entry in our list should now be share shnum for
1148 # a particular version of the mutable file, so all of the
1149 # entries should be identical.
1151 assert [x for x in data if x != datum] == []
1153 blockhashes[shnum] = datum[0]
1154 start_segments[shnum] = datum[1]
1155 end_segments[shnum] = datum[2]
1157 d1 = r.decode(start_segments, self._start_segment)
1158 d2 = r.decode(end_segments, self._end_segment)
1159 d3 = defer.succeed(blockhashes)
1160 return deferredutil.gatherResults([d1, d2, d3])
1163 def _build_uploadable_and_finish(self, segments_and_bht, data, offset):
1165 After the process has the plaintext segments, I build the
1166 TransformingUploadable that the publisher will eventually
1167 re-upload to the grid. I then invoke the publisher with that
1168 uploadable, and return a Deferred when the publish operation has
1169 completed without issue.
1171 u = TransformingUploadable(data, offset,
1173 segments_and_bht[0],
1174 segments_and_bht[1])
1175 p = Publish(self._node, self._storage_broker, self._servermap)
1176 return p.update(u, offset, segments_and_bht[2], self._version)
1179 def _update_servermap(self, mode=MODE_WRITE, update_range=None):
1181 I update the servermap. I return a Deferred that fires when the
1182 servermap update is done.
1185 u = ServermapUpdater(self._node, self._storage_broker, Monitor(),
1188 update_range=update_range)
1190 u = ServermapUpdater(self._node, self._storage_broker, Monitor(),