4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, ICheckable, ICheckResults, \
8 NotEnoughSharesError, MDMF_VERSION, SDMF_VERSION, IMutableUploadable, \
9 IMutableFileVersion, IWriteable
10 from allmydata.util import hashutil, log, consumer, deferredutil, mathutil
11 from allmydata.util.assertutil import precondition
12 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI, \
13 WriteableMDMFFileURI, ReadonlyMDMFFileURI
14 from allmydata.monitor import Monitor
15 from pycryptopp.cipher.aes import AES
17 from allmydata.mutable.publish import Publish, MutableData,\
18 TransformingUploadable
19 from allmydata.mutable.common import MODE_READ, MODE_WRITE, MODE_CHECK, UnrecoverableFileError, \
20 ResponseCache, UncoordinatedWriteError
21 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
22 from allmydata.mutable.retrieve import Retrieve
23 from allmydata.mutable.checker import MutableChecker, MutableCheckAndRepairer
24 from allmydata.mutable.repairer import Repairer
28 # these parameters are copied from foolscap.reconnector, which gets them
29 # from twisted.internet.protocol.ReconnectingClientFactory
31 factor = 2.7182818284590451 # (math.e)
32 jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
36 self._delay = self.initialDelay
38 def delay(self, node, f):
42 self._delay = self._delay * self.factor
43 self._delay = random.normalvariate(self._delay,
44 self._delay * self.jitter)
46 reactor.callLater(self._delay, d.callback, None)
49 # use nodemaker.create_mutable_file() to make one of these
51 class MutableFileNode:
52 implements(IMutableFileNode, ICheckable)
54 def __init__(self, storage_broker, secret_holder,
55 default_encoding_parameters, history):
56 self._storage_broker = storage_broker
57 self._secret_holder = secret_holder
58 self._default_encoding_parameters = default_encoding_parameters
59 self._history = history
60 self._pubkey = None # filled in upon first read
61 self._privkey = None # filled in if we're mutable
62 # we keep track of the last encoding parameters that we use. These
63 # are updated upon retrieve, and used by publish. If we publish
64 # without ever reading (i.e. overwrite()), then we use these values.
65 self._required_shares = default_encoding_parameters["k"]
66 self._total_shares = default_encoding_parameters["n"]
67 self._sharemap = {} # known shares, shnum-to-[nodeids]
68 self._cache = ResponseCache()
69 self._most_recent_size = None
70 # filled in after __init__ if we're being created for the first time;
71 # filled in by the servermap updater before publishing, otherwise.
72 # set to this default value in case neither of those things happen,
73 # or in case the servermap can't find any shares to tell us what
75 self._protocol_version = None
77 # all users of this MutableFileNode go through the serializer. This
78 # takes advantage of the fact that Deferreds discard the callbacks
79 # that they're done with, so we can keep using the same Deferred
80 # forever without consuming more and more memory.
81 self._serializer = defer.succeed(None)
83 # Starting with MDMF, we can get these from caps if they're
84 # there. Leave them alone for now; they'll be filled in by my
85 # init_from_cap method if necessary.
86 self._downloader_hints = {}
89 if hasattr(self, '_uri'):
90 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
92 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
94 def init_from_cap(self, filecap):
95 # we have the URI, but we have not yet retrieved the public
96 # verification key, nor things like 'k' or 'N'. If and when someone
97 # wants to get our contents, we'll pull from shares and fill those
99 if isinstance(filecap, (WriteableMDMFFileURI, ReadonlyMDMFFileURI)):
100 self._protocol_version = MDMF_VERSION
101 elif isinstance(filecap, (ReadonlySSKFileURI, WriteableSSKFileURI)):
102 self._protocol_version = SDMF_VERSION
105 self._writekey = None
107 if not filecap.is_readonly() and filecap.is_mutable():
108 self._writekey = self._uri.writekey
109 self._readkey = self._uri.readkey
110 self._storage_index = self._uri.storage_index
111 self._fingerprint = self._uri.fingerprint
112 # the following values are learned during Retrieval
114 # self._required_shares
116 # and these are needed for Publish. They are filled in by Retrieval
117 # if possible, otherwise by the first peer that Publish talks to.
119 self._encprivkey = None
123 def create_with_keys(self, (pubkey, privkey), contents,
124 version=SDMF_VERSION):
125 """Call this to create a brand-new mutable file. It will create the
126 shares, find homes for them, and upload the initial contents (created
127 with the same rules as IClient.create_mutable_file() ). Returns a
128 Deferred that fires (with the MutableFileNode instance you should
129 use) when it completes.
131 self._pubkey, self._privkey = pubkey, privkey
132 pubkey_s = self._pubkey.serialize()
133 privkey_s = self._privkey.serialize()
134 self._writekey = hashutil.ssk_writekey_hash(privkey_s)
135 self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
136 self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
137 if version == MDMF_VERSION:
138 self._uri = WriteableMDMFFileURI(self._writekey, self._fingerprint)
139 self._protocol_version = version
140 elif version == SDMF_VERSION:
141 self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
142 self._protocol_version = version
143 self._readkey = self._uri.readkey
144 self._storage_index = self._uri.storage_index
145 initial_contents = self._get_initial_contents(contents)
146 return self._upload(initial_contents, None)
148 def _get_initial_contents(self, contents):
150 return MutableData("")
152 if isinstance(contents, str):
153 return MutableData(contents)
155 if IMutableUploadable.providedBy(contents):
158 assert callable(contents), "%s should be callable, not %s" % \
159 (contents, type(contents))
160 return contents(self)
162 def _encrypt_privkey(self, writekey, privkey):
164 crypttext = enc.process(privkey)
167 def _decrypt_privkey(self, enc_privkey):
168 enc = AES(self._writekey)
169 privkey = enc.process(enc_privkey)
172 def _populate_pubkey(self, pubkey):
173 self._pubkey = pubkey
174 def _populate_required_shares(self, required_shares):
175 self._required_shares = required_shares
176 def _populate_total_shares(self, total_shares):
177 self._total_shares = total_shares
179 def _populate_privkey(self, privkey):
180 self._privkey = privkey
181 def _populate_encprivkey(self, encprivkey):
182 self._encprivkey = encprivkey
183 def _add_to_cache(self, verinfo, shnum, offset, data):
184 self._cache.add(verinfo, shnum, offset, data)
185 def _read_from_cache(self, verinfo, shnum, offset, length):
186 return self._cache.read(verinfo, shnum, offset, length)
188 def get_write_enabler(self, server):
189 seed = server.get_foolscap_write_enabler_seed()
190 assert len(seed) == 20
191 return hashutil.ssk_write_enabler_hash(self._writekey, seed)
192 def get_renewal_secret(self, server):
193 crs = self._secret_holder.get_renewal_secret()
194 frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
195 lease_seed = server.get_lease_seed()
196 assert len(lease_seed) == 20
197 return hashutil.bucket_renewal_secret_hash(frs, lease_seed)
198 def get_cancel_secret(self, server):
199 ccs = self._secret_holder.get_cancel_secret()
200 fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
201 lease_seed = server.get_lease_seed()
202 assert len(lease_seed) == 20
203 return hashutil.bucket_cancel_secret_hash(fcs, lease_seed)
205 def get_writekey(self):
206 return self._writekey
207 def get_readkey(self):
209 def get_storage_index(self):
210 return self._storage_index
211 def get_fingerprint(self):
212 return self._fingerprint
213 def get_privkey(self):
215 def get_encprivkey(self):
216 return self._encprivkey
217 def get_pubkey(self):
220 def get_required_shares(self):
221 return self._required_shares
222 def get_total_shares(self):
223 return self._total_shares
225 ####################################
229 return self._most_recent_size
231 def get_current_size(self):
232 d = self.get_size_of_best_version()
233 d.addCallback(self._stash_size)
236 def _stash_size(self, size):
237 self._most_recent_size = size
242 def get_readcap(self):
243 return self._uri.get_readonly()
244 def get_verify_cap(self):
245 return self._uri.get_verify_cap()
246 def get_repair_cap(self):
247 if self._uri.is_readonly():
252 return self._uri.to_string()
254 def get_write_uri(self):
255 if self.is_readonly():
257 return self._uri.to_string()
259 def get_readonly_uri(self):
260 return self._uri.get_readonly().to_string()
262 def get_readonly(self):
263 if self.is_readonly():
265 ro = MutableFileNode(self._storage_broker, self._secret_holder,
266 self._default_encoding_parameters, self._history)
267 ro.init_from_cap(self._uri.get_readonly())
270 def is_mutable(self):
271 return self._uri.is_mutable()
273 def is_readonly(self):
274 return self._uri.is_readonly()
276 def is_unknown(self):
279 def is_allowed_in_immutable_directory(self):
280 return not self._uri.is_mutable()
282 def raise_error(self):
286 return hash((self.__class__, self._uri))
287 def __cmp__(self, them):
288 if cmp(type(self), type(them)):
289 return cmp(type(self), type(them))
290 if cmp(self.__class__, them.__class__):
291 return cmp(self.__class__, them.__class__)
292 return cmp(self._uri, them._uri)
295 #################################
298 def check(self, monitor, verify=False, add_lease=False):
299 checker = MutableChecker(self, self._storage_broker,
300 self._history, monitor)
301 return checker.check(verify, add_lease)
303 def check_and_repair(self, monitor, verify=False, add_lease=False):
304 checker = MutableCheckAndRepairer(self, self._storage_broker,
305 self._history, monitor)
306 return checker.check(verify, add_lease)
308 #################################
311 def repair(self, check_results, force=False, monitor=None):
312 assert ICheckResults(check_results)
313 r = Repairer(self, check_results, self._storage_broker,
314 self._history, monitor)
319 #################################
322 def get_best_readable_version(self):
324 I return a Deferred that fires with a MutableFileVersion
325 representing the best readable version of the file that I
328 return self.get_readable_version()
331 def get_readable_version(self, servermap=None, version=None):
333 I return a Deferred that fires with an MutableFileVersion for my
334 version argument, if there is a recoverable file of that version
335 on the grid. If there is no recoverable version, I fire with an
336 UnrecoverableFileError.
338 If a servermap is provided, I look in there for the requested
339 version. If no servermap is provided, I create and update a new
342 If no version is provided, then I return a MutableFileVersion
343 representing the best recoverable version of the file.
345 d = self._get_version_from_servermap(MODE_READ, servermap, version)
346 def _build_version((servermap, their_version)):
347 assert their_version in servermap.recoverable_versions()
348 assert their_version in servermap.make_versionmap()
350 mfv = MutableFileVersion(self,
354 self._storage_broker,
356 history=self._history)
357 assert mfv.is_readonly()
358 mfv.set_downloader_hints(self._downloader_hints)
359 # our caller can use this to download the contents of the
362 return d.addCallback(_build_version)
365 def _get_version_from_servermap(self,
370 I return a Deferred that fires with (servermap, version).
372 This function performs validation and a servermap update. If it
373 returns (servermap, version), the caller can assume that:
374 - servermap was last updated in mode.
375 - version is recoverable, and corresponds to the servermap.
377 If version and servermap are provided to me, I will validate
378 that version exists in the servermap, and that the servermap was
381 If version is not provided, but servermap is, I will validate
382 the servermap and return the best recoverable version that I can
383 find in the servermap.
385 If the version is provided but the servermap isn't, I will
386 obtain a servermap that has been updated in the correct mode and
387 validate that version is found and recoverable.
389 If neither servermap nor version are provided, I will obtain a
390 servermap updated in the correct mode, and return the best
391 recoverable version that I can find in there.
394 if servermap and servermap.get_last_update()[0] == mode:
395 d = defer.succeed(servermap)
397 d = self._get_servermap(mode)
399 def _get_version(servermap, v):
400 if v and v not in servermap.recoverable_versions():
403 v = servermap.best_recoverable_version()
405 raise UnrecoverableFileError("no recoverable versions")
407 return (servermap, v)
408 return d.addCallback(_get_version, version)
411 def download_best_version(self):
413 I return a Deferred that fires with the contents of the best
414 version of this mutable file.
416 return self._do_serialized(self._download_best_version)
419 def _download_best_version(self):
421 I am the serialized sibling of download_best_version.
423 d = self.get_best_readable_version()
424 d.addCallback(self._record_size)
425 d.addCallback(lambda version: version.download_to_data())
427 # It is possible that the download will fail because there
428 # aren't enough shares to be had. If so, we will try again after
429 # updating the servermap in MODE_WRITE, which may find more
430 # shares than updating in MODE_READ, as we just did. We can do
431 # this by getting the best mutable version and downloading from
432 # that -- the best mutable version will be a MutableFileVersion
433 # with a servermap that was last updated in MODE_WRITE, as we
434 # want. If this fails, then we give up.
435 def _maybe_retry(failure):
436 failure.trap(NotEnoughSharesError)
438 d = self.get_best_mutable_version()
439 d.addCallback(self._record_size)
440 d.addCallback(lambda version: version.download_to_data())
443 d.addErrback(_maybe_retry)
447 def _record_size(self, mfv):
449 I record the size of a mutable file version.
451 self._most_recent_size = mfv.get_size()
455 def get_size_of_best_version(self):
457 I return the size of the best version of this mutable file.
459 This is equivalent to calling get_size() on the result of
460 get_best_readable_version().
462 d = self.get_best_readable_version()
463 return d.addCallback(lambda mfv: mfv.get_size())
466 #################################
469 def get_best_mutable_version(self, servermap=None):
471 I return a Deferred that fires with a MutableFileVersion
472 representing the best readable version of the file that I
473 represent. I am like get_best_readable_version, except that I
474 will try to make a writeable version if I can.
476 return self.get_mutable_version(servermap=servermap)
479 def get_mutable_version(self, servermap=None, version=None):
481 I return a version of this mutable file. I return a Deferred
482 that fires with a MutableFileVersion
484 If version is provided, the Deferred will fire with a
485 MutableFileVersion initailized with that version. Otherwise, it
486 will fire with the best version that I can recover.
488 If servermap is provided, I will use that to find versions
489 instead of performing my own servermap update.
491 if self.is_readonly():
492 return self.get_readable_version(servermap=servermap,
495 # get_mutable_version => write intent, so we require that the
496 # servermap is updated in MODE_WRITE
497 d = self._get_version_from_servermap(MODE_WRITE, servermap, version)
498 def _build_version((servermap, smap_version)):
499 # these should have been set by the servermap update.
500 assert self._secret_holder
501 assert self._writekey
503 mfv = MutableFileVersion(self,
507 self._storage_broker,
511 history=self._history)
512 assert not mfv.is_readonly()
513 mfv.set_downloader_hints(self._downloader_hints)
516 return d.addCallback(_build_version)
519 # XXX: I'm uncomfortable with the difference between upload and
520 # overwrite, which, FWICT, is basically that you don't have to
521 # do a servermap update before you overwrite. We split them up
522 # that way anyway, so I guess there's no real difficulty in
523 # offering both ways to callers, but it also makes the
524 # public-facing API cluttery, and makes it hard to discern the
525 # right way of doing things.
527 # In general, we leave it to callers to ensure that they aren't
528 # going to cause UncoordinatedWriteErrors when working with
529 # MutableFileVersions. We know that the next three operations
530 # (upload, overwrite, and modify) will all operate on the same
531 # version, so we say that only one of them can be going on at once,
532 # and serialize them to ensure that that actually happens, since as
533 # the caller in this situation it is our job to do that.
534 def overwrite(self, new_contents):
536 I overwrite the contents of the best recoverable version of this
537 mutable file with new_contents. This is equivalent to calling
538 overwrite on the result of get_best_mutable_version with
539 new_contents as an argument. I return a Deferred that eventually
540 fires with the results of my replacement process.
542 # TODO: Update downloader hints.
543 return self._do_serialized(self._overwrite, new_contents)
546 def _overwrite(self, new_contents):
548 I am the serialized sibling of overwrite.
550 d = self.get_best_mutable_version()
551 d.addCallback(lambda mfv: mfv.overwrite(new_contents))
552 d.addCallback(self._did_upload, new_contents.get_size())
556 def upload(self, new_contents, servermap):
558 I overwrite the contents of the best recoverable version of this
559 mutable file with new_contents, using servermap instead of
560 creating/updating our own servermap. I return a Deferred that
561 fires with the results of my upload.
563 # TODO: Update downloader hints
564 return self._do_serialized(self._upload, new_contents, servermap)
567 def modify(self, modifier, backoffer=None):
569 I modify the contents of the best recoverable version of this
570 mutable file with the modifier. This is equivalent to calling
571 modify on the result of get_best_mutable_version. I return a
572 Deferred that eventually fires with an UploadResults instance
573 describing this process.
575 # TODO: Update downloader hints.
576 return self._do_serialized(self._modify, modifier, backoffer)
579 def _modify(self, modifier, backoffer):
581 I am the serialized sibling of modify.
583 d = self.get_best_mutable_version()
584 d.addCallback(lambda mfv: mfv.modify(modifier, backoffer))
588 def download_version(self, servermap, version, fetch_privkey=False):
590 Download the specified version of this mutable file. I return a
591 Deferred that fires with the contents of the specified version
592 as a bytestring, or errbacks if the file is not recoverable.
594 d = self.get_readable_version(servermap, version)
595 return d.addCallback(lambda mfv: mfv.download_to_data(fetch_privkey))
598 def get_servermap(self, mode):
600 I return a servermap that has been updated in mode.
602 mode should be one of MODE_READ, MODE_WRITE, MODE_CHECK or
603 MODE_ANYTHING. See servermap.py for more on what these mean.
605 return self._do_serialized(self._get_servermap, mode)
608 def _get_servermap(self, mode):
610 I am a serialized twin to get_servermap.
612 servermap = ServerMap()
613 d = self._update_servermap(servermap, mode)
614 # The servermap will tell us about the most recent size of the
615 # file, so we may as well set that so that callers might get
616 # more data about us.
617 if not self._most_recent_size:
618 d.addCallback(self._get_size_from_servermap)
622 def _get_size_from_servermap(self, servermap):
624 I extract the size of the best version of this file and record
625 it in self._most_recent_size. I return the servermap that I was
628 if servermap.recoverable_versions():
629 v = servermap.best_recoverable_version()
630 size = v[4] # verinfo[4] == size
631 self._most_recent_size = size
635 def _update_servermap(self, servermap, mode):
636 u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
639 self._history.notify_mapupdate(u.get_status())
643 #def set_version(self, version):
644 # I can be set in two ways:
645 # 1. When the node is created.
646 # 2. (for an existing share) when the Servermap is updated
648 # assert version in (MDMF_VERSION, SDMF_VERSION)
649 # self._protocol_version = version
652 def get_version(self):
653 return self._protocol_version
656 def _do_serialized(self, cb, *args, **kwargs):
657 # note: to avoid deadlock, this callable is *not* allowed to invoke
658 # other serialized methods within this (or any other)
659 # MutableFileNode. The callable should be a bound method of this same
662 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
663 # we need to put off d.callback until this Deferred is finished being
664 # processed. Otherwise the caller's subsequent activities (like,
665 # doing other things with this node) can cause reentrancy problems in
666 # the Deferred code itself
667 self._serializer.addBoth(lambda res: eventually(d.callback, res))
668 # add a log.err just in case something really weird happens, because
669 # self._serializer stays around forever, therefore we won't see the
670 # usual Unhandled Error in Deferred that would give us a hint.
671 self._serializer.addErrback(log.err)
675 def _upload(self, new_contents, servermap):
677 A MutableFileNode still has to have some way of getting
678 published initially, which is what I am here for. After that,
679 all publishing, updating, modifying and so on happens through
682 assert self._pubkey, "update_servermap must be called before publish"
684 # Define IPublishInvoker with a set_downloader_hints method?
685 # Then have the publisher call that method when it's done publishing?
686 p = Publish(self, self._storage_broker, servermap)
688 self._history.notify_publish(p.get_status(),
689 new_contents.get_size())
690 d = p.publish(new_contents)
691 d.addCallback(self._did_upload, new_contents.get_size())
695 def set_downloader_hints(self, hints):
696 self._downloader_hints = hints
698 def _did_upload(self, res, size):
699 self._most_recent_size = size
703 class MutableFileVersion:
705 I represent a specific version (most likely the best version) of a
708 Since I implement IReadable, instances which hold a
709 reference to an instance of me are guaranteed the ability (absent
710 connection difficulties or unrecoverable versions) to read the file
711 that I represent. Depending on whether I was initialized with a
712 write capability or not, I may also provide callers the ability to
713 overwrite or modify the contents of the mutable file that I
716 implements(IMutableFileVersion, IWriteable)
730 self._servermap = servermap
731 self._version = version
732 self._storage_index = storage_index
733 self._write_secrets = write_secrets
734 self._history = history
735 self._storage_broker = storage_broker
737 #assert isinstance(readcap, IURI)
738 self._readcap = readcap
740 self._writekey = writekey
741 self._serializer = defer.succeed(None)
744 def get_sequence_number(self):
746 Get the sequence number of the mutable version that I represent.
748 return self._version[0] # verinfo[0] == the sequence number
752 def get_writekey(self):
754 I return a writekey or None if I don't have a writekey.
756 return self._writekey
759 def set_downloader_hints(self, hints):
761 I set the downloader hints.
763 assert isinstance(hints, dict)
765 self._downloader_hints = hints
768 def get_downloader_hints(self):
770 I return the downloader hints.
772 return self._downloader_hints
775 def overwrite(self, new_contents):
777 I overwrite the contents of this mutable file version with the
778 data in new_contents.
780 assert not self.is_readonly()
782 return self._do_serialized(self._overwrite, new_contents)
785 def _overwrite(self, new_contents):
786 assert IMutableUploadable.providedBy(new_contents)
787 assert self._servermap.get_last_update()[0] == MODE_WRITE
789 return self._upload(new_contents)
792 def modify(self, modifier, backoffer=None):
793 """I use a modifier callback to apply a change to the mutable file.
794 I implement the following pseudocode::
796 obtain_mutable_filenode_lock()
799 update_servermap(MODE_WRITE)
800 old = retrieve_best_version()
801 new = modifier(old, servermap, first_time)
806 except UncoordinatedWriteError, e:
810 release_mutable_filenode_lock()
812 The idea is that your modifier function can apply a delta of some
813 sort, and it will be re-run as necessary until it succeeds. The
814 modifier must inspect the old version to see whether its delta has
815 already been applied: if so it should return the contents unmodified.
817 Note that the modifier is required to run synchronously, and must not
818 invoke any methods on this MutableFileNode instance.
820 The backoff-er is a callable that is responsible for inserting a
821 random delay between subsequent attempts, to help competing updates
822 from colliding forever. It is also allowed to give up after a while.
823 The backoffer is given two arguments: this MutableFileNode, and the
824 Failure object that contains the UncoordinatedWriteError. It should
825 return a Deferred that will fire when the next attempt should be
826 made, or return the Failure if the loop should give up. If
827 backoffer=None, a default one is provided which will perform
828 exponential backoff, and give up after 4 tries. Note that the
829 backoffer should not invoke any methods on this MutableFileNode
830 instance, and it needs to be highly conscious of deadlock issues.
832 assert not self.is_readonly()
834 return self._do_serialized(self._modify, modifier, backoffer)
837 def _modify(self, modifier, backoffer):
838 if backoffer is None:
839 backoffer = BackoffAgent().delay
840 return self._modify_and_retry(modifier, backoffer, True)
843 def _modify_and_retry(self, modifier, backoffer, first_time):
845 I try to apply modifier to the contents of this version of the
846 mutable file. If I succeed, I return an UploadResults instance
847 describing my success. If I fail, I try again after waiting for
850 log.msg("doing modify")
852 d = self._update_servermap()
854 # We ran into trouble; do MODE_CHECK so we're a little more
855 # careful on subsequent tries.
856 d = self._update_servermap(mode=MODE_CHECK)
858 d.addCallback(lambda ignored:
859 self._modify_once(modifier, first_time))
861 f.trap(UncoordinatedWriteError)
862 # Uh oh, it broke. We're allowed to trust the servermap for our
863 # first try, but after that we need to update it. It's
864 # possible that we've failed due to a race with another
865 # uploader, and if the race is to converge correctly, we
866 # need to know about that upload.
867 d2 = defer.maybeDeferred(backoffer, self, f)
868 d2.addCallback(lambda ignored:
869 self._modify_and_retry(modifier,
876 def _modify_once(self, modifier, first_time):
878 I attempt to apply a modifier to the contents of the mutable
881 assert self._servermap.get_last_update()[0] != MODE_READ
883 # download_to_data is serialized, so we have to call this to
885 d = self._try_to_download_data()
886 def _apply(old_contents):
887 new_contents = modifier(old_contents, self._servermap, first_time)
888 precondition((isinstance(new_contents, str) or
889 new_contents is None),
890 "Modifier function must return a string "
893 if new_contents is None or new_contents == old_contents:
894 log.msg("no changes")
895 # no changes need to be made
898 # However, since Publish is not automatically doing a
899 # recovery when it observes UCWE, we need to do a second
900 # publish. See #551 for details. We'll basically loop until
901 # we managed an uncontested publish.
902 old_uploadable = MutableData(old_contents)
903 new_contents = old_uploadable
905 new_contents = MutableData(new_contents)
907 return self._upload(new_contents)
908 d.addCallback(_apply)
912 def is_readonly(self):
914 I return True if this MutableFileVersion provides no write
915 access to the file that it encapsulates, and False if it
916 provides the ability to modify the file.
918 return self._writekey is None
921 def is_mutable(self):
923 I return True, since mutable files are always mutable by
929 def get_storage_index(self):
931 I return the storage index of the reference that I encapsulate.
933 return self._storage_index
938 I return the length, in bytes, of this readable object.
940 return self._servermap.size_of_version(self._version)
943 def download_to_data(self, fetch_privkey=False):
945 I return a Deferred that fires with the contents of this
946 readable object as a byte string.
949 c = consumer.MemoryConsumer()
950 d = self.read(c, fetch_privkey=fetch_privkey)
951 d.addCallback(lambda mc: "".join(mc.chunks))
955 def _try_to_download_data(self):
957 I am an unserialized cousin of download_to_data; I am called
958 from the children of modify() to download the data associated
959 with this mutable version.
961 c = consumer.MemoryConsumer()
962 # modify will almost certainly write, so we need the privkey.
963 d = self._read(c, fetch_privkey=True)
964 d.addCallback(lambda mc: "".join(mc.chunks))
968 def read(self, consumer, offset=0, size=None, fetch_privkey=False):
970 I read a portion (possibly all) of the mutable file that I
971 reference into consumer.
973 return self._do_serialized(self._read, consumer, offset, size,
977 def _read(self, consumer, offset=0, size=None, fetch_privkey=False):
979 I am the serialized companion of read.
981 r = Retrieve(self._node, self._storage_broker, self._servermap,
982 self._version, fetch_privkey)
984 self._history.notify_retrieve(r.get_status())
985 d = r.download(consumer, offset, size)
989 def _do_serialized(self, cb, *args, **kwargs):
990 # note: to avoid deadlock, this callable is *not* allowed to invoke
991 # other serialized methods within this (or any other)
992 # MutableFileNode. The callable should be a bound method of this same
995 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
996 # we need to put off d.callback until this Deferred is finished being
997 # processed. Otherwise the caller's subsequent activities (like,
998 # doing other things with this node) can cause reentrancy problems in
999 # the Deferred code itself
1000 self._serializer.addBoth(lambda res: eventually(d.callback, res))
1001 # add a log.err just in case something really weird happens, because
1002 # self._serializer stays around forever, therefore we won't see the
1003 # usual Unhandled Error in Deferred that would give us a hint.
1004 self._serializer.addErrback(log.err)
1008 def _upload(self, new_contents):
1009 #assert self._pubkey, "update_servermap must be called before publish"
1010 p = Publish(self._node, self._storage_broker, self._servermap)
1012 self._history.notify_publish(p.get_status(),
1013 new_contents.get_size())
1014 d = p.publish(new_contents)
1015 d.addCallback(self._did_upload, new_contents.get_size())
1019 def _did_upload(self, res, size):
1020 self._most_recent_size = size
1023 def update(self, data, offset):
1025 Do an update of this mutable file version by inserting data at
1026 offset within the file. If offset is the EOF, this is an append
1027 operation. I return a Deferred that fires with the results of
1028 the update operation when it has completed.
1030 In cases where update does not append any data, or where it does
1031 not append so many blocks that the block count crosses a
1032 power-of-two boundary, this operation will use roughly
1033 O(data.get_size()) memory/bandwidth/CPU to perform the update.
1034 Otherwise, it must download, re-encode, and upload the entire
1035 file again, which will use O(filesize) resources.
1037 return self._do_serialized(self._update, data, offset)
1040 def _update(self, data, offset):
1042 I update the mutable file version represented by this particular
1043 IMutableVersion by inserting the data in data at the offset
1044 offset. I return a Deferred that fires when this has been
1047 new_size = data.get_size() + offset
1048 old_size = self.get_size()
1049 segment_size = self._version[3]
1050 num_old_segments = mathutil.div_ceil(old_size,
1052 num_new_segments = mathutil.div_ceil(new_size,
1054 log.msg("got %d old segments, %d new segments" % \
1055 (num_old_segments, num_new_segments))
1057 # We do a whole file re-encode if the file is an SDMF file.
1058 if self._version[2]: # version[2] == SDMF salt, which MDMF lacks
1059 log.msg("doing re-encode instead of in-place update")
1060 return self._do_modify_update(data, offset)
1062 # Otherwise, we can replace just the parts that are changing.
1063 log.msg("updating in place")
1064 d = self._do_update_update(data, offset)
1065 d.addCallback(self._decode_and_decrypt_segments, data, offset)
1066 d.addCallback(self._build_uploadable_and_finish, data, offset)
1070 def _do_modify_update(self, data, offset):
1072 I perform a file update by modifying the contents of the file
1073 after downloading it, then reuploading it. I am less efficient
1074 than _do_update_update, but am necessary for certain updates.
1076 def m(old, servermap, first_time):
1078 rest = offset + data.get_size()
1080 new += "".join(data.read(data.get_size()))
1083 return self._modify(m, None)
1086 def _do_update_update(self, data, offset):
1088 I start the Servermap update that gets us the data we need to
1089 continue the update process. I return a Deferred that fires when
1090 the servermap update is done.
1092 assert IMutableUploadable.providedBy(data)
1093 assert self.is_mutable()
1094 # offset == self.get_size() is valid and means that we are
1095 # appending data to the file.
1096 assert offset <= self.get_size()
1098 segsize = self._version[3]
1099 # We'll need the segment that the data starts in, regardless of
1100 # what we'll do later.
1101 start_segment = offset // segsize
1103 # We only need the end segment if the data we append does not go
1104 # beyond the current end-of-file.
1105 end_segment = start_segment
1106 if offset + data.get_size() < self.get_size():
1107 end_data = offset + data.get_size()
1108 # The last byte we touch is the end_data'th byte, which is actually
1109 # byte end_data - 1 because bytes are zero-indexed.
1111 end_segment = end_data // segsize
1113 self._start_segment = start_segment
1114 self._end_segment = end_segment
1116 # Now ask for the servermap to be updated in MODE_WRITE with
1117 # this update range.
1118 return self._update_servermap(update_range=(start_segment,
1122 def _decode_and_decrypt_segments(self, ignored, data, offset):
1124 After the servermap update, I take the encrypted and encoded
1125 data that the servermap fetched while doing its update and
1126 transform it into decoded-and-decrypted plaintext that can be
1127 used by the new uploadable. I return a Deferred that fires with
1130 r = Retrieve(self._node, self._storage_broker, self._servermap,
1132 # decode: takes in our blocks and salts from the servermap,
1133 # returns a Deferred that fires with the corresponding plaintext
1134 # segments. Does not download -- simply takes advantage of
1135 # existing infrastructure within the Retrieve class to avoid
1137 sm = self._servermap
1138 # XXX: If the methods in the servermap don't work as
1139 # abstractions, you should rewrite them instead of going around
1141 update_data = sm.update_data
1142 start_segments = {} # shnum -> start segment
1143 end_segments = {} # shnum -> end segment
1144 blockhashes = {} # shnum -> blockhash tree
1145 for (shnum, original_data) in update_data.iteritems():
1146 data = [d[1] for d in original_data if d[0] == self._version]
1147 # data is [(blockhashes,start,end)..]
1149 # Every data entry in our list should now be share shnum for
1150 # a particular version of the mutable file, so all of the
1151 # entries should be identical.
1153 assert [x for x in data if x != datum] == []
1155 # datum is (blockhashes,start,end)
1156 blockhashes[shnum] = datum[0]
1157 start_segments[shnum] = datum[1] # (block,salt) bytestrings
1158 end_segments[shnum] = datum[2]
1160 d1 = r.decode(start_segments, self._start_segment)
1161 d2 = r.decode(end_segments, self._end_segment)
1162 d3 = defer.succeed(blockhashes)
1163 return deferredutil.gatherResults([d1, d2, d3])
1166 def _build_uploadable_and_finish(self, segments_and_bht, data, offset):
1168 After the process has the plaintext segments, I build the
1169 TransformingUploadable that the publisher will eventually
1170 re-upload to the grid. I then invoke the publisher with that
1171 uploadable, and return a Deferred when the publish operation has
1172 completed without issue.
1174 u = TransformingUploadable(data, offset,
1176 segments_and_bht[0],
1177 segments_and_bht[1])
1178 p = Publish(self._node, self._storage_broker, self._servermap)
1179 return p.update(u, offset, segments_and_bht[2], self._version)
1182 def _update_servermap(self, mode=MODE_WRITE, update_range=None):
1184 I update the servermap. I return a Deferred that fires when the
1185 servermap update is done.
1188 u = ServermapUpdater(self._node, self._storage_broker, Monitor(),
1191 update_range=update_range)
1193 u = ServermapUpdater(self._node, self._storage_broker, Monitor(),