]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/filenode.py
MutableFileNode.set_downloader_hints: never depend upon order of dict.values()
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / filenode.py
1
2 import random
3
4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, ICheckable, ICheckResults, \
8      NotEnoughSharesError, MDMF_VERSION, SDMF_VERSION, IMutableUploadable, \
9      IMutableFileVersion, IWriteable
10 from allmydata.util import hashutil, log, consumer, deferredutil, mathutil
11 from allmydata.util.assertutil import precondition
12 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI, \
13                           WriteableMDMFFileURI, ReadonlyMDMFFileURI
14 from allmydata.monitor import Monitor
15 from pycryptopp.cipher.aes import AES
16
17 from allmydata.mutable.publish import Publish, MutableData,\
18                                       TransformingUploadable
19 from allmydata.mutable.common import MODE_READ, MODE_WRITE, MODE_CHECK, UnrecoverableFileError, \
20      ResponseCache, UncoordinatedWriteError
21 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
22 from allmydata.mutable.retrieve import Retrieve
23 from allmydata.mutable.checker import MutableChecker, MutableCheckAndRepairer
24 from allmydata.mutable.repairer import Repairer
25
26
27 class BackoffAgent:
28     # these parameters are copied from foolscap.reconnector, which gets them
29     # from twisted.internet.protocol.ReconnectingClientFactory
30     initialDelay = 1.0
31     factor = 2.7182818284590451 # (math.e)
32     jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
33     maxRetries = 4
34
35     def __init__(self):
36         self._delay = self.initialDelay
37         self._count = 0
38     def delay(self, node, f):
39         self._count += 1
40         if self._count == 4:
41             return f
42         self._delay = self._delay * self.factor
43         self._delay = random.normalvariate(self._delay,
44                                            self._delay * self.jitter)
45         d = defer.Deferred()
46         reactor.callLater(self._delay, d.callback, None)
47         return d
48
49 # use nodemaker.create_mutable_file() to make one of these
50
51 class MutableFileNode:
52     implements(IMutableFileNode, ICheckable)
53
54     def __init__(self, storage_broker, secret_holder,
55                  default_encoding_parameters, history):
56         self._storage_broker = storage_broker
57         self._secret_holder = secret_holder
58         self._default_encoding_parameters = default_encoding_parameters
59         self._history = history
60         self._pubkey = None # filled in upon first read
61         self._privkey = None # filled in if we're mutable
62         # we keep track of the last encoding parameters that we use. These
63         # are updated upon retrieve, and used by publish. If we publish
64         # without ever reading (i.e. overwrite()), then we use these values.
65         self._required_shares = default_encoding_parameters["k"]
66         self._total_shares = default_encoding_parameters["n"]
67         self._sharemap = {} # known shares, shnum-to-[nodeids]
68         self._cache = ResponseCache()
69         self._most_recent_size = None
70         # filled in after __init__ if we're being created for the first time;
71         # filled in by the servermap updater before publishing, otherwise.
72         # set to this default value in case neither of those things happen,
73         # or in case the servermap can't find any shares to tell us what
74         # to publish as.
75         self._protocol_version = None
76
77         # all users of this MutableFileNode go through the serializer. This
78         # takes advantage of the fact that Deferreds discard the callbacks
79         # that they're done with, so we can keep using the same Deferred
80         # forever without consuming more and more memory.
81         self._serializer = defer.succeed(None)
82
83         # Starting with MDMF, we can get these from caps if they're
84         # there. Leave them alone for now; they'll be filled in by my
85         # init_from_cap method if necessary.
86         self._downloader_hints = {}
87
88     def __repr__(self):
89         if hasattr(self, '_uri'):
90             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
91         else:
92             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
93
94     def init_from_cap(self, filecap):
95         # we have the URI, but we have not yet retrieved the public
96         # verification key, nor things like 'k' or 'N'. If and when someone
97         # wants to get our contents, we'll pull from shares and fill those
98         # in.
99         if isinstance(filecap, (WriteableMDMFFileURI, ReadonlyMDMFFileURI)):
100             self._protocol_version = MDMF_VERSION
101         elif isinstance(filecap, (ReadonlySSKFileURI, WriteableSSKFileURI)):
102             self._protocol_version = SDMF_VERSION
103
104         self._uri = filecap
105         self._writekey = None
106
107         if not filecap.is_readonly() and filecap.is_mutable():
108             self._writekey = self._uri.writekey
109         self._readkey = self._uri.readkey
110         self._storage_index = self._uri.storage_index
111         self._fingerprint = self._uri.fingerprint
112         # the following values are learned during Retrieval
113         #  self._pubkey
114         #  self._required_shares
115         #  self._total_shares
116         # and these are needed for Publish. They are filled in by Retrieval
117         # if possible, otherwise by the first peer that Publish talks to.
118         self._privkey = None
119         self._encprivkey = None
120
121         # Starting with MDMF caps, we allowed arbitrary extensions in
122         # caps. If we were initialized with a cap that had extensions,
123         # we want to remember them so we can tell MutableFileVersions
124         # about them.
125         extensions = self._uri.get_extension_params()
126         if extensions:
127             extensions = map(int, extensions)
128             suspected_k, suspected_segsize = extensions
129             self._downloader_hints['k'] = suspected_k
130             self._downloader_hints['segsize'] = suspected_segsize
131
132         return self
133
134     def create_with_keys(self, (pubkey, privkey), contents,
135                          version=SDMF_VERSION):
136         """Call this to create a brand-new mutable file. It will create the
137         shares, find homes for them, and upload the initial contents (created
138         with the same rules as IClient.create_mutable_file() ). Returns a
139         Deferred that fires (with the MutableFileNode instance you should
140         use) when it completes.
141         """
142         self._pubkey, self._privkey = pubkey, privkey
143         pubkey_s = self._pubkey.serialize()
144         privkey_s = self._privkey.serialize()
145         self._writekey = hashutil.ssk_writekey_hash(privkey_s)
146         self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
147         self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
148         if version == MDMF_VERSION:
149             self._uri = WriteableMDMFFileURI(self._writekey, self._fingerprint)
150             self._protocol_version = version
151         elif version == SDMF_VERSION:
152             self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
153             self._protocol_version = version
154         self._readkey = self._uri.readkey
155         self._storage_index = self._uri.storage_index
156         initial_contents = self._get_initial_contents(contents)
157         return self._upload(initial_contents, None)
158
159     def _get_initial_contents(self, contents):
160         if contents is None:
161             return MutableData("")
162
163         if isinstance(contents, str):
164             return MutableData(contents)
165
166         if IMutableUploadable.providedBy(contents):
167             return contents
168
169         assert callable(contents), "%s should be callable, not %s" % \
170                (contents, type(contents))
171         return contents(self)
172
173     def _encrypt_privkey(self, writekey, privkey):
174         enc = AES(writekey)
175         crypttext = enc.process(privkey)
176         return crypttext
177
178     def _decrypt_privkey(self, enc_privkey):
179         enc = AES(self._writekey)
180         privkey = enc.process(enc_privkey)
181         return privkey
182
183     def _populate_pubkey(self, pubkey):
184         self._pubkey = pubkey
185     def _populate_required_shares(self, required_shares):
186         self._required_shares = required_shares
187     def _populate_total_shares(self, total_shares):
188         self._total_shares = total_shares
189
190     def _populate_privkey(self, privkey):
191         self._privkey = privkey
192     def _populate_encprivkey(self, encprivkey):
193         self._encprivkey = encprivkey
194     def _add_to_cache(self, verinfo, shnum, offset, data):
195         self._cache.add(verinfo, shnum, offset, data)
196     def _read_from_cache(self, verinfo, shnum, offset, length):
197         return self._cache.read(verinfo, shnum, offset, length)
198
199     def get_write_enabler(self, peerid):
200         assert len(peerid) == 20
201         return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
202     def get_renewal_secret(self, peerid):
203         assert len(peerid) == 20
204         crs = self._secret_holder.get_renewal_secret()
205         frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
206         return hashutil.bucket_renewal_secret_hash(frs, peerid)
207     def get_cancel_secret(self, peerid):
208         assert len(peerid) == 20
209         ccs = self._secret_holder.get_cancel_secret()
210         fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
211         return hashutil.bucket_cancel_secret_hash(fcs, peerid)
212
213     def get_writekey(self):
214         return self._writekey
215     def get_readkey(self):
216         return self._readkey
217     def get_storage_index(self):
218         return self._storage_index
219     def get_fingerprint(self):
220         return self._fingerprint
221     def get_privkey(self):
222         return self._privkey
223     def get_encprivkey(self):
224         return self._encprivkey
225     def get_pubkey(self):
226         return self._pubkey
227
228     def get_required_shares(self):
229         return self._required_shares
230     def get_total_shares(self):
231         return self._total_shares
232
233     ####################################
234     # IFilesystemNode
235
236     def get_size(self):
237         return self._most_recent_size
238
239     def get_current_size(self):
240         d = self.get_size_of_best_version()
241         d.addCallback(self._stash_size)
242         return d
243
244     def _stash_size(self, size):
245         self._most_recent_size = size
246         return size
247
248     def get_cap(self):
249         return self._uri
250     def get_readcap(self):
251         return self._uri.get_readonly()
252     def get_verify_cap(self):
253         return self._uri.get_verify_cap()
254     def get_repair_cap(self):
255         if self._uri.is_readonly():
256             return None
257         return self._uri
258
259     def get_uri(self):
260         return self._uri.to_string()
261
262     def get_write_uri(self):
263         if self.is_readonly():
264             return None
265         return self._uri.to_string()
266
267     def get_readonly_uri(self):
268         return self._uri.get_readonly().to_string()
269
270     def get_readonly(self):
271         if self.is_readonly():
272             return self
273         ro = MutableFileNode(self._storage_broker, self._secret_holder,
274                              self._default_encoding_parameters, self._history)
275         ro.init_from_cap(self._uri.get_readonly())
276         return ro
277
278     def is_mutable(self):
279         return self._uri.is_mutable()
280
281     def is_readonly(self):
282         return self._uri.is_readonly()
283
284     def is_unknown(self):
285         return False
286
287     def is_allowed_in_immutable_directory(self):
288         return not self._uri.is_mutable()
289
290     def raise_error(self):
291         pass
292
293     def __hash__(self):
294         return hash((self.__class__, self._uri))
295     def __cmp__(self, them):
296         if cmp(type(self), type(them)):
297             return cmp(type(self), type(them))
298         if cmp(self.__class__, them.__class__):
299             return cmp(self.__class__, them.__class__)
300         return cmp(self._uri, them._uri)
301
302
303     #################################
304     # ICheckable
305
306     def check(self, monitor, verify=False, add_lease=False):
307         checker = MutableChecker(self, self._storage_broker,
308                                  self._history, monitor)
309         return checker.check(verify, add_lease)
310
311     def check_and_repair(self, monitor, verify=False, add_lease=False):
312         checker = MutableCheckAndRepairer(self, self._storage_broker,
313                                           self._history, monitor)
314         return checker.check(verify, add_lease)
315
316     #################################
317     # IRepairable
318
319     def repair(self, check_results, force=False):
320         assert ICheckResults(check_results)
321         r = Repairer(self, check_results)
322         d = r.start(force)
323         return d
324
325
326     #################################
327     # IFileNode
328
329     def get_best_readable_version(self):
330         """
331         I return a Deferred that fires with a MutableFileVersion
332         representing the best readable version of the file that I
333         represent
334         """
335         return self.get_readable_version()
336
337
338     def get_readable_version(self, servermap=None, version=None):
339         """
340         I return a Deferred that fires with an MutableFileVersion for my
341         version argument, if there is a recoverable file of that version
342         on the grid. If there is no recoverable version, I fire with an
343         UnrecoverableFileError.
344
345         If a servermap is provided, I look in there for the requested
346         version. If no servermap is provided, I create and update a new
347         one.
348
349         If no version is provided, then I return a MutableFileVersion
350         representing the best recoverable version of the file.
351         """
352         d = self._get_version_from_servermap(MODE_READ, servermap, version)
353         def _build_version((servermap, their_version)):
354             assert their_version in servermap.recoverable_versions()
355             assert their_version in servermap.make_versionmap()
356
357             mfv = MutableFileVersion(self,
358                                      servermap,
359                                      their_version,
360                                      self._storage_index,
361                                      self._storage_broker,
362                                      self._readkey,
363                                      history=self._history)
364             assert mfv.is_readonly()
365             mfv.set_downloader_hints(self._downloader_hints)
366             # our caller can use this to download the contents of the
367             # mutable file.
368             return mfv
369         return d.addCallback(_build_version)
370
371
372     def _get_version_from_servermap(self,
373                                     mode,
374                                     servermap=None,
375                                     version=None):
376         """
377         I return a Deferred that fires with (servermap, version).
378
379         This function performs validation and a servermap update. If it
380         returns (servermap, version), the caller can assume that:
381             - servermap was last updated in mode.
382             - version is recoverable, and corresponds to the servermap.
383
384         If version and servermap are provided to me, I will validate
385         that version exists in the servermap, and that the servermap was
386         updated correctly.
387
388         If version is not provided, but servermap is, I will validate
389         the servermap and return the best recoverable version that I can
390         find in the servermap.
391
392         If the version is provided but the servermap isn't, I will
393         obtain a servermap that has been updated in the correct mode and
394         validate that version is found and recoverable.
395
396         If neither servermap nor version are provided, I will obtain a
397         servermap updated in the correct mode, and return the best
398         recoverable version that I can find in there.
399         """
400         # XXX: wording ^^^^
401         if servermap and servermap.last_update_mode == mode:
402             d = defer.succeed(servermap)
403         else:
404             d = self._get_servermap(mode)
405
406         def _get_version(servermap, v):
407             if v and v not in servermap.recoverable_versions():
408                 v = None
409             elif not v:
410                 v = servermap.best_recoverable_version()
411             if not v:
412                 raise UnrecoverableFileError("no recoverable versions")
413
414             return (servermap, v)
415         return d.addCallback(_get_version, version)
416
417
418     def download_best_version(self):
419         """
420         I return a Deferred that fires with the contents of the best
421         version of this mutable file.
422         """
423         return self._do_serialized(self._download_best_version)
424
425
426     def _download_best_version(self):
427         """
428         I am the serialized sibling of download_best_version.
429         """
430         d = self.get_best_readable_version()
431         d.addCallback(self._record_size)
432         d.addCallback(lambda version: version.download_to_data())
433
434         # It is possible that the download will fail because there
435         # aren't enough shares to be had. If so, we will try again after
436         # updating the servermap in MODE_WRITE, which may find more
437         # shares than updating in MODE_READ, as we just did. We can do
438         # this by getting the best mutable version and downloading from
439         # that -- the best mutable version will be a MutableFileVersion
440         # with a servermap that was last updated in MODE_WRITE, as we
441         # want. If this fails, then we give up.
442         def _maybe_retry(failure):
443             failure.trap(NotEnoughSharesError)
444
445             d = self.get_best_mutable_version()
446             d.addCallback(self._record_size)
447             d.addCallback(lambda version: version.download_to_data())
448             return d
449
450         d.addErrback(_maybe_retry)
451         return d
452
453
454     def _record_size(self, mfv):
455         """
456         I record the size of a mutable file version.
457         """
458         self._most_recent_size = mfv.get_size()
459         return mfv
460
461
462     def get_size_of_best_version(self):
463         """
464         I return the size of the best version of this mutable file.
465
466         This is equivalent to calling get_size() on the result of
467         get_best_readable_version().
468         """
469         d = self.get_best_readable_version()
470         return d.addCallback(lambda mfv: mfv.get_size())
471
472
473     #################################
474     # IMutableFileNode
475
476     def get_best_mutable_version(self, servermap=None):
477         """
478         I return a Deferred that fires with a MutableFileVersion
479         representing the best readable version of the file that I
480         represent. I am like get_best_readable_version, except that I
481         will try to make a writeable version if I can.
482         """
483         return self.get_mutable_version(servermap=servermap)
484
485
486     def get_mutable_version(self, servermap=None, version=None):
487         """
488         I return a version of this mutable file. I return a Deferred
489         that fires with a MutableFileVersion
490
491         If version is provided, the Deferred will fire with a
492         MutableFileVersion initailized with that version. Otherwise, it
493         will fire with the best version that I can recover.
494
495         If servermap is provided, I will use that to find versions
496         instead of performing my own servermap update.
497         """
498         if self.is_readonly():
499             return self.get_readable_version(servermap=servermap,
500                                              version=version)
501
502         # get_mutable_version => write intent, so we require that the
503         # servermap is updated in MODE_WRITE
504         d = self._get_version_from_servermap(MODE_WRITE, servermap, version)
505         def _build_version((servermap, smap_version)):
506             # these should have been set by the servermap update.
507             assert self._secret_holder
508             assert self._writekey
509
510             mfv = MutableFileVersion(self,
511                                      servermap,
512                                      smap_version,
513                                      self._storage_index,
514                                      self._storage_broker,
515                                      self._readkey,
516                                      self._writekey,
517                                      self._secret_holder,
518                                      history=self._history)
519             assert not mfv.is_readonly()
520             mfv.set_downloader_hints(self._downloader_hints)
521             return mfv
522
523         return d.addCallback(_build_version)
524
525
526     # XXX: I'm uncomfortable with the difference between upload and
527     #      overwrite, which, FWICT, is basically that you don't have to
528     #      do a servermap update before you overwrite. We split them up
529     #      that way anyway, so I guess there's no real difficulty in
530     #      offering both ways to callers, but it also makes the
531     #      public-facing API cluttery, and makes it hard to discern the
532     #      right way of doing things.
533
534     # In general, we leave it to callers to ensure that they aren't
535     # going to cause UncoordinatedWriteErrors when working with
536     # MutableFileVersions. We know that the next three operations
537     # (upload, overwrite, and modify) will all operate on the same
538     # version, so we say that only one of them can be going on at once,
539     # and serialize them to ensure that that actually happens, since as
540     # the caller in this situation it is our job to do that.
541     def overwrite(self, new_contents):
542         """
543         I overwrite the contents of the best recoverable version of this
544         mutable file with new_contents. This is equivalent to calling
545         overwrite on the result of get_best_mutable_version with
546         new_contents as an argument. I return a Deferred that eventually
547         fires with the results of my replacement process.
548         """
549         # TODO: Update downloader hints.
550         return self._do_serialized(self._overwrite, new_contents)
551
552
553     def _overwrite(self, new_contents):
554         """
555         I am the serialized sibling of overwrite.
556         """
557         d = self.get_best_mutable_version()
558         d.addCallback(lambda mfv: mfv.overwrite(new_contents))
559         d.addCallback(self._did_upload, new_contents.get_size())
560         return d
561
562
563     def upload(self, new_contents, servermap):
564         """
565         I overwrite the contents of the best recoverable version of this
566         mutable file with new_contents, using servermap instead of
567         creating/updating our own servermap. I return a Deferred that
568         fires with the results of my upload.
569         """
570         # TODO: Update downloader hints
571         return self._do_serialized(self._upload, new_contents, servermap)
572
573
574     def modify(self, modifier, backoffer=None):
575         """
576         I modify the contents of the best recoverable version of this
577         mutable file with the modifier. This is equivalent to calling
578         modify on the result of get_best_mutable_version. I return a
579         Deferred that eventually fires with an UploadResults instance
580         describing this process.
581         """
582         # TODO: Update downloader hints.
583         return self._do_serialized(self._modify, modifier, backoffer)
584
585
586     def _modify(self, modifier, backoffer):
587         """
588         I am the serialized sibling of modify.
589         """
590         d = self.get_best_mutable_version()
591         d.addCallback(lambda mfv: mfv.modify(modifier, backoffer))
592         return d
593
594
595     def download_version(self, servermap, version, fetch_privkey=False):
596         """
597         Download the specified version of this mutable file. I return a
598         Deferred that fires with the contents of the specified version
599         as a bytestring, or errbacks if the file is not recoverable.
600         """
601         d = self.get_readable_version(servermap, version)
602         return d.addCallback(lambda mfv: mfv.download_to_data(fetch_privkey))
603
604
605     def get_servermap(self, mode):
606         """
607         I return a servermap that has been updated in mode.
608
609         mode should be one of MODE_READ, MODE_WRITE, MODE_CHECK or
610         MODE_ANYTHING. See servermap.py for more on what these mean.
611         """
612         return self._do_serialized(self._get_servermap, mode)
613
614
615     def _get_servermap(self, mode):
616         """
617         I am a serialized twin to get_servermap.
618         """
619         servermap = ServerMap()
620         d = self._update_servermap(servermap, mode)
621         # The servermap will tell us about the most recent size of the
622         # file, so we may as well set that so that callers might get
623         # more data about us.
624         if not self._most_recent_size:
625             d.addCallback(self._get_size_from_servermap)
626         return d
627
628
629     def _get_size_from_servermap(self, servermap):
630         """
631         I extract the size of the best version of this file and record
632         it in self._most_recent_size. I return the servermap that I was
633         given.
634         """
635         if servermap.recoverable_versions():
636             v = servermap.best_recoverable_version()
637             size = v[4] # verinfo[4] == size
638             self._most_recent_size = size
639         return servermap
640
641
642     def _update_servermap(self, servermap, mode):
643         u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
644                              mode)
645         if self._history:
646             self._history.notify_mapupdate(u.get_status())
647         return u.update()
648
649
650     #def set_version(self, version):
651         # I can be set in two ways:
652         #  1. When the node is created.
653         #  2. (for an existing share) when the Servermap is updated 
654         #     before I am read.
655     #    assert version in (MDMF_VERSION, SDMF_VERSION)
656     #    self._protocol_version = version
657
658
659     def get_version(self):
660         return self._protocol_version
661
662
663     def _do_serialized(self, cb, *args, **kwargs):
664         # note: to avoid deadlock, this callable is *not* allowed to invoke
665         # other serialized methods within this (or any other)
666         # MutableFileNode. The callable should be a bound method of this same
667         # MFN instance.
668         d = defer.Deferred()
669         self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
670         # we need to put off d.callback until this Deferred is finished being
671         # processed. Otherwise the caller's subsequent activities (like,
672         # doing other things with this node) can cause reentrancy problems in
673         # the Deferred code itself
674         self._serializer.addBoth(lambda res: eventually(d.callback, res))
675         # add a log.err just in case something really weird happens, because
676         # self._serializer stays around forever, therefore we won't see the
677         # usual Unhandled Error in Deferred that would give us a hint.
678         self._serializer.addErrback(log.err)
679         return d
680
681
682     def _upload(self, new_contents, servermap):
683         """
684         A MutableFileNode still has to have some way of getting
685         published initially, which is what I am here for. After that,
686         all publishing, updating, modifying and so on happens through
687         MutableFileVersions.
688         """
689         assert self._pubkey, "update_servermap must be called before publish"
690
691         # Define IPublishInvoker with a set_downloader_hints method?
692         # Then have the publisher call that method when it's done publishing?
693         p = Publish(self, self._storage_broker, servermap)
694         if self._history:
695             self._history.notify_publish(p.get_status(),
696                                          new_contents.get_size())
697         d = p.publish(new_contents)
698         d.addCallback(self._did_upload, new_contents.get_size())
699         return d
700
701
702     def set_downloader_hints(self, hints):
703         self._downloader_hints = hints
704         extensions = [ hints["k"], hints["segsize"] ]
705         self._uri.set_extension_params(extensions)
706
707
708     def _did_upload(self, res, size):
709         self._most_recent_size = size
710         return res
711
712
713 class MutableFileVersion:
714     """
715     I represent a specific version (most likely the best version) of a
716     mutable file.
717
718     Since I implement IReadable, instances which hold a
719     reference to an instance of me are guaranteed the ability (absent
720     connection difficulties or unrecoverable versions) to read the file
721     that I represent. Depending on whether I was initialized with a
722     write capability or not, I may also provide callers the ability to
723     overwrite or modify the contents of the mutable file that I
724     reference.
725     """
726     implements(IMutableFileVersion, IWriteable)
727
728     def __init__(self,
729                  node,
730                  servermap,
731                  version,
732                  storage_index,
733                  storage_broker,
734                  readcap,
735                  writekey=None,
736                  write_secrets=None,
737                  history=None):
738
739         self._node = node
740         self._servermap = servermap
741         self._version = version
742         self._storage_index = storage_index
743         self._write_secrets = write_secrets
744         self._history = history
745         self._storage_broker = storage_broker
746
747         #assert isinstance(readcap, IURI)
748         self._readcap = readcap
749
750         self._writekey = writekey
751         self._serializer = defer.succeed(None)
752
753
754     def get_sequence_number(self):
755         """
756         Get the sequence number of the mutable version that I represent.
757         """
758         return self._version[0] # verinfo[0] == the sequence number
759
760
761     # TODO: Terminology?
762     def get_writekey(self):
763         """
764         I return a writekey or None if I don't have a writekey.
765         """
766         return self._writekey
767
768
769     def set_downloader_hints(self, hints):
770         """
771         I set the downloader hints.
772         """
773         assert isinstance(hints, dict)
774
775         self._downloader_hints = hints
776
777
778     def get_downloader_hints(self):
779         """
780         I return the downloader hints.
781         """
782         return self._downloader_hints
783
784
785     def overwrite(self, new_contents):
786         """
787         I overwrite the contents of this mutable file version with the
788         data in new_contents.
789         """
790         assert not self.is_readonly()
791
792         return self._do_serialized(self._overwrite, new_contents)
793
794
795     def _overwrite(self, new_contents):
796         assert IMutableUploadable.providedBy(new_contents)
797         assert self._servermap.last_update_mode == MODE_WRITE
798
799         return self._upload(new_contents)
800
801
802     def modify(self, modifier, backoffer=None):
803         """I use a modifier callback to apply a change to the mutable file.
804         I implement the following pseudocode::
805
806          obtain_mutable_filenode_lock()
807          first_time = True
808          while True:
809            update_servermap(MODE_WRITE)
810            old = retrieve_best_version()
811            new = modifier(old, servermap, first_time)
812            first_time = False
813            if new == old: break
814            try:
815              publish(new)
816            except UncoordinatedWriteError, e:
817              backoffer(e)
818              continue
819            break
820          release_mutable_filenode_lock()
821
822         The idea is that your modifier function can apply a delta of some
823         sort, and it will be re-run as necessary until it succeeds. The
824         modifier must inspect the old version to see whether its delta has
825         already been applied: if so it should return the contents unmodified.
826
827         Note that the modifier is required to run synchronously, and must not
828         invoke any methods on this MutableFileNode instance.
829
830         The backoff-er is a callable that is responsible for inserting a
831         random delay between subsequent attempts, to help competing updates
832         from colliding forever. It is also allowed to give up after a while.
833         The backoffer is given two arguments: this MutableFileNode, and the
834         Failure object that contains the UncoordinatedWriteError. It should
835         return a Deferred that will fire when the next attempt should be
836         made, or return the Failure if the loop should give up. If
837         backoffer=None, a default one is provided which will perform
838         exponential backoff, and give up after 4 tries. Note that the
839         backoffer should not invoke any methods on this MutableFileNode
840         instance, and it needs to be highly conscious of deadlock issues.
841         """
842         assert not self.is_readonly()
843
844         return self._do_serialized(self._modify, modifier, backoffer)
845
846
847     def _modify(self, modifier, backoffer):
848         if backoffer is None:
849             backoffer = BackoffAgent().delay
850         return self._modify_and_retry(modifier, backoffer, True)
851
852
853     def _modify_and_retry(self, modifier, backoffer, first_time):
854         """
855         I try to apply modifier to the contents of this version of the
856         mutable file. If I succeed, I return an UploadResults instance
857         describing my success. If I fail, I try again after waiting for
858         a little bit.
859         """
860         log.msg("doing modify")
861         if first_time:
862             d = self._update_servermap()
863         else:
864             # We ran into trouble; do MODE_CHECK so we're a little more
865             # careful on subsequent tries.
866             d = self._update_servermap(mode=MODE_CHECK)
867
868         d.addCallback(lambda ignored:
869             self._modify_once(modifier, first_time))
870         def _retry(f):
871             f.trap(UncoordinatedWriteError)
872             # Uh oh, it broke. We're allowed to trust the servermap for our
873             # first try, but after that we need to update it. It's
874             # possible that we've failed due to a race with another
875             # uploader, and if the race is to converge correctly, we
876             # need to know about that upload.
877             d2 = defer.maybeDeferred(backoffer, self, f)
878             d2.addCallback(lambda ignored:
879                            self._modify_and_retry(modifier,
880                                                   backoffer, False))
881             return d2
882         d.addErrback(_retry)
883         return d
884
885
886     def _modify_once(self, modifier, first_time):
887         """
888         I attempt to apply a modifier to the contents of the mutable
889         file.
890         """
891         assert self._servermap.last_update_mode != MODE_READ
892
893         # download_to_data is serialized, so we have to call this to
894         # avoid deadlock.
895         d = self._try_to_download_data()
896         def _apply(old_contents):
897             new_contents = modifier(old_contents, self._servermap, first_time)
898             precondition((isinstance(new_contents, str) or
899                           new_contents is None),
900                          "Modifier function must return a string "
901                          "or None")
902
903             if new_contents is None or new_contents == old_contents:
904                 log.msg("no changes")
905                 # no changes need to be made
906                 if first_time:
907                     return
908                 # However, since Publish is not automatically doing a
909                 # recovery when it observes UCWE, we need to do a second
910                 # publish. See #551 for details. We'll basically loop until
911                 # we managed an uncontested publish.
912                 old_uploadable = MutableData(old_contents)
913                 new_contents = old_uploadable
914             else:
915                 new_contents = MutableData(new_contents)
916
917             return self._upload(new_contents)
918         d.addCallback(_apply)
919         return d
920
921
922     def is_readonly(self):
923         """
924         I return True if this MutableFileVersion provides no write
925         access to the file that it encapsulates, and False if it
926         provides the ability to modify the file.
927         """
928         return self._writekey is None
929
930
931     def is_mutable(self):
932         """
933         I return True, since mutable files are always mutable by
934         somebody.
935         """
936         return True
937
938
939     def get_storage_index(self):
940         """
941         I return the storage index of the reference that I encapsulate.
942         """
943         return self._storage_index
944
945
946     def get_size(self):
947         """
948         I return the length, in bytes, of this readable object.
949         """
950         return self._servermap.size_of_version(self._version)
951
952
953     def download_to_data(self, fetch_privkey=False):
954         """
955         I return a Deferred that fires with the contents of this
956         readable object as a byte string.
957
958         """
959         c = consumer.MemoryConsumer()
960         d = self.read(c, fetch_privkey=fetch_privkey)
961         d.addCallback(lambda mc: "".join(mc.chunks))
962         return d
963
964
965     def _try_to_download_data(self):
966         """
967         I am an unserialized cousin of download_to_data; I am called
968         from the children of modify() to download the data associated
969         with this mutable version.
970         """
971         c = consumer.MemoryConsumer()
972         # modify will almost certainly write, so we need the privkey.
973         d = self._read(c, fetch_privkey=True)
974         d.addCallback(lambda mc: "".join(mc.chunks))
975         return d
976
977
978     def read(self, consumer, offset=0, size=None, fetch_privkey=False):
979         """
980         I read a portion (possibly all) of the mutable file that I
981         reference into consumer.
982         """
983         return self._do_serialized(self._read, consumer, offset, size,
984                                    fetch_privkey)
985
986
987     def _read(self, consumer, offset=0, size=None, fetch_privkey=False):
988         """
989         I am the serialized companion of read.
990         """
991         r = Retrieve(self._node, self._servermap, self._version, fetch_privkey)
992         if self._history:
993             self._history.notify_retrieve(r.get_status())
994         d = r.download(consumer, offset, size)
995         return d
996
997
998     def _do_serialized(self, cb, *args, **kwargs):
999         # note: to avoid deadlock, this callable is *not* allowed to invoke
1000         # other serialized methods within this (or any other)
1001         # MutableFileNode. The callable should be a bound method of this same
1002         # MFN instance.
1003         d = defer.Deferred()
1004         self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
1005         # we need to put off d.callback until this Deferred is finished being
1006         # processed. Otherwise the caller's subsequent activities (like,
1007         # doing other things with this node) can cause reentrancy problems in
1008         # the Deferred code itself
1009         self._serializer.addBoth(lambda res: eventually(d.callback, res))
1010         # add a log.err just in case something really weird happens, because
1011         # self._serializer stays around forever, therefore we won't see the
1012         # usual Unhandled Error in Deferred that would give us a hint.
1013         self._serializer.addErrback(log.err)
1014         return d
1015
1016
1017     def _upload(self, new_contents):
1018         #assert self._pubkey, "update_servermap must be called before publish"
1019         p = Publish(self._node, self._storage_broker, self._servermap)
1020         if self._history:
1021             self._history.notify_publish(p.get_status(),
1022                                          new_contents.get_size())
1023         d = p.publish(new_contents)
1024         d.addCallback(self._did_upload, new_contents.get_size())
1025         return d
1026
1027
1028     def _did_upload(self, res, size):
1029         self._most_recent_size = size
1030         return res
1031
1032     def update(self, data, offset):
1033         """
1034         Do an update of this mutable file version by inserting data at
1035         offset within the file. If offset is the EOF, this is an append
1036         operation. I return a Deferred that fires with the results of
1037         the update operation when it has completed.
1038
1039         In cases where update does not append any data, or where it does
1040         not append so many blocks that the block count crosses a
1041         power-of-two boundary, this operation will use roughly
1042         O(data.get_size()) memory/bandwidth/CPU to perform the update.
1043         Otherwise, it must download, re-encode, and upload the entire
1044         file again, which will use O(filesize) resources.
1045         """
1046         return self._do_serialized(self._update, data, offset)
1047
1048
1049     def _update(self, data, offset):
1050         """
1051         I update the mutable file version represented by this particular
1052         IMutableVersion by inserting the data in data at the offset
1053         offset. I return a Deferred that fires when this has been
1054         completed.
1055         """
1056         new_size = data.get_size() + offset
1057         old_size = self.get_size()
1058         segment_size = self._version[3]
1059         num_old_segments = mathutil.div_ceil(old_size,
1060                                              segment_size)
1061         num_new_segments = mathutil.div_ceil(new_size,
1062                                              segment_size)
1063         log.msg("got %d old segments, %d new segments" % \
1064                         (num_old_segments, num_new_segments))
1065
1066         # We do a whole file re-encode if the file is an SDMF file. 
1067         if self._version[2]: # version[2] == SDMF salt, which MDMF lacks
1068             log.msg("doing re-encode instead of in-place update")
1069             return self._do_modify_update(data, offset)
1070
1071         # Otherwise, we can replace just the parts that are changing.
1072         log.msg("updating in place")
1073         d = self._do_update_update(data, offset)
1074         d.addCallback(self._decode_and_decrypt_segments, data, offset)
1075         d.addCallback(self._build_uploadable_and_finish, data, offset)
1076         return d
1077
1078
1079     def _do_modify_update(self, data, offset):
1080         """
1081         I perform a file update by modifying the contents of the file
1082         after downloading it, then reuploading it. I am less efficient
1083         than _do_update_update, but am necessary for certain updates.
1084         """
1085         def m(old, servermap, first_time):
1086             start = offset
1087             rest = offset + data.get_size()
1088             new = old[:start]
1089             new += "".join(data.read(data.get_size()))
1090             new += old[rest:]
1091             return new
1092         return self._modify(m, None)
1093
1094
1095     def _do_update_update(self, data, offset):
1096         """
1097         I start the Servermap update that gets us the data we need to
1098         continue the update process. I return a Deferred that fires when
1099         the servermap update is done.
1100         """
1101         assert IMutableUploadable.providedBy(data)
1102         assert self.is_mutable()
1103         # offset == self.get_size() is valid and means that we are
1104         # appending data to the file.
1105         assert offset <= self.get_size()
1106
1107         segsize = self._version[3]
1108         # We'll need the segment that the data starts in, regardless of
1109         # what we'll do later.
1110         start_segment = offset // segsize
1111
1112         # We only need the end segment if the data we append does not go
1113         # beyond the current end-of-file.
1114         end_segment = start_segment
1115         if offset + data.get_size() < self.get_size():
1116             end_data = offset + data.get_size()
1117             # The last byte we touch is the end_data'th byte, which is actually
1118             # byte end_data - 1 because bytes are zero-indexed.
1119             end_data -= 1
1120             end_segment = end_data // segsize
1121
1122         self._start_segment = start_segment
1123         self._end_segment = end_segment
1124
1125         # Now ask for the servermap to be updated in MODE_WRITE with
1126         # this update range.
1127         return self._update_servermap(update_range=(start_segment,
1128                                                     end_segment))
1129
1130
1131     def _decode_and_decrypt_segments(self, ignored, data, offset):
1132         """
1133         After the servermap update, I take the encrypted and encoded
1134         data that the servermap fetched while doing its update and
1135         transform it into decoded-and-decrypted plaintext that can be
1136         used by the new uploadable. I return a Deferred that fires with
1137         the segments.
1138         """
1139         r = Retrieve(self._node, self._servermap, self._version)
1140         # decode: takes in our blocks and salts from the servermap,
1141         # returns a Deferred that fires with the corresponding plaintext
1142         # segments. Does not download -- simply takes advantage of
1143         # existing infrastructure within the Retrieve class to avoid
1144         # duplicating code.
1145         sm = self._servermap
1146         # XXX: If the methods in the servermap don't work as
1147         # abstractions, you should rewrite them instead of going around
1148         # them.
1149         update_data = sm.update_data
1150         start_segments = {} # shnum -> start segment
1151         end_segments = {} # shnum -> end segment
1152         blockhashes = {} # shnum -> blockhash tree
1153         for (shnum, data) in update_data.iteritems():
1154             data = [d[1] for d in data if d[0] == self._version]
1155
1156             # Every data entry in our list should now be share shnum for
1157             # a particular version of the mutable file, so all of the
1158             # entries should be identical.
1159             datum = data[0]
1160             assert filter(lambda x: x != datum, data) == []
1161
1162             blockhashes[shnum] = datum[0]
1163             start_segments[shnum] = datum[1]
1164             end_segments[shnum] = datum[2]
1165
1166         d1 = r.decode(start_segments, self._start_segment)
1167         d2 = r.decode(end_segments, self._end_segment)
1168         d3 = defer.succeed(blockhashes)
1169         return deferredutil.gatherResults([d1, d2, d3])
1170
1171
1172     def _build_uploadable_and_finish(self, segments_and_bht, data, offset):
1173         """
1174         After the process has the plaintext segments, I build the
1175         TransformingUploadable that the publisher will eventually
1176         re-upload to the grid. I then invoke the publisher with that
1177         uploadable, and return a Deferred when the publish operation has
1178         completed without issue.
1179         """
1180         u = TransformingUploadable(data, offset,
1181                                    self._version[3],
1182                                    segments_and_bht[0],
1183                                    segments_and_bht[1])
1184         p = Publish(self._node, self._storage_broker, self._servermap)
1185         return p.update(u, offset, segments_and_bht[2], self._version)
1186
1187
1188     def _update_servermap(self, mode=MODE_WRITE, update_range=None):
1189         """
1190         I update the servermap. I return a Deferred that fires when the
1191         servermap update is done.
1192         """
1193         if update_range:
1194             u = ServermapUpdater(self._node, self._storage_broker, Monitor(),
1195                                  self._servermap,
1196                                  mode=mode,
1197                                  update_range=update_range)
1198         else:
1199             u = ServermapUpdater(self._node, self._storage_broker, Monitor(),
1200                                  self._servermap,
1201                                  mode=mode)
1202         return u.update()