]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/filenode.py
Flesh out "tahoe magic-folder status" command
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / filenode.py
1
2 import random
3
4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, ICheckable, ICheckResults, \
8      NotEnoughSharesError, MDMF_VERSION, SDMF_VERSION, IMutableUploadable, \
9      IMutableFileVersion, IWriteable
10 from allmydata.util import hashutil, log, consumer, deferredutil, mathutil
11 from allmydata.util.assertutil import precondition
12 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI, \
13                           WriteableMDMFFileURI, ReadonlyMDMFFileURI
14 from allmydata.monitor import Monitor
15 from pycryptopp.cipher.aes import AES
16
17 from allmydata.mutable.publish import Publish, MutableData,\
18                                       TransformingUploadable
19 from allmydata.mutable.common import MODE_READ, MODE_WRITE, MODE_CHECK, UnrecoverableFileError, \
20      UncoordinatedWriteError
21 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
22 from allmydata.mutable.retrieve import Retrieve
23 from allmydata.mutable.checker import MutableChecker, MutableCheckAndRepairer
24 from allmydata.mutable.repairer import Repairer
25
26
27 class BackoffAgent:
28     # these parameters are copied from foolscap.reconnector, which gets them
29     # from twisted.internet.protocol.ReconnectingClientFactory
30     initialDelay = 1.0
31     factor = 2.7182818284590451 # (math.e)
32     jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
33     maxRetries = 4
34
35     def __init__(self):
36         self._delay = self.initialDelay
37         self._count = 0
38     def delay(self, node, f):
39         self._count += 1
40         if self._count == 4:
41             return f
42         self._delay = self._delay * self.factor
43         self._delay = random.normalvariate(self._delay,
44                                            self._delay * self.jitter)
45         d = defer.Deferred()
46         reactor.callLater(self._delay, d.callback, None)
47         return d
48
49 # use nodemaker.create_mutable_file() to make one of these
50
51 class MutableFileNode:
52     implements(IMutableFileNode, ICheckable)
53
54     def __init__(self, storage_broker, secret_holder,
55                  default_encoding_parameters, history):
56         self._storage_broker = storage_broker
57         self._secret_holder = secret_holder
58         self._default_encoding_parameters = default_encoding_parameters
59         self._history = history
60         self._pubkey = None # filled in upon first read
61         self._privkey = None # filled in if we're mutable
62         # we keep track of the last encoding parameters that we use. These
63         # are updated upon retrieve, and used by publish. If we publish
64         # without ever reading (i.e. overwrite()), then we use these values.
65         self._required_shares = default_encoding_parameters["k"]
66         self._total_shares = default_encoding_parameters["n"]
67         self._sharemap = {} # known shares, shnum-to-[nodeids]
68         self._most_recent_size = None
69         # filled in after __init__ if we're being created for the first time;
70         # filled in by the servermap updater before publishing, otherwise.
71         # set to this default value in case neither of those things happen,
72         # or in case the servermap can't find any shares to tell us what
73         # to publish as.
74         self._protocol_version = None
75
76         # all users of this MutableFileNode go through the serializer. This
77         # takes advantage of the fact that Deferreds discard the callbacks
78         # that they're done with, so we can keep using the same Deferred
79         # forever without consuming more and more memory.
80         self._serializer = defer.succeed(None)
81
82         # Starting with MDMF, we can get these from caps if they're
83         # there. Leave them alone for now; they'll be filled in by my
84         # init_from_cap method if necessary.
85         self._downloader_hints = {}
86
87     def __repr__(self):
88         if hasattr(self, '_uri'):
89             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
90         else:
91             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
92
93     def init_from_cap(self, filecap):
94         # we have the URI, but we have not yet retrieved the public
95         # verification key, nor things like 'k' or 'N'. If and when someone
96         # wants to get our contents, we'll pull from shares and fill those
97         # in.
98         if isinstance(filecap, (WriteableMDMFFileURI, ReadonlyMDMFFileURI)):
99             self._protocol_version = MDMF_VERSION
100         elif isinstance(filecap, (ReadonlySSKFileURI, WriteableSSKFileURI)):
101             self._protocol_version = SDMF_VERSION
102
103         self._uri = filecap
104         self._writekey = None
105
106         if not filecap.is_readonly() and filecap.is_mutable():
107             self._writekey = self._uri.writekey
108         self._readkey = self._uri.readkey
109         self._storage_index = self._uri.storage_index
110         self._fingerprint = self._uri.fingerprint
111         # the following values are learned during Retrieval
112         #  self._pubkey
113         #  self._required_shares
114         #  self._total_shares
115         # and these are needed for Publish. They are filled in by Retrieval
116         # if possible, otherwise by the first peer that Publish talks to.
117         self._privkey = None
118         self._encprivkey = None
119
120         return self
121
122     def create_with_keys(self, (pubkey, privkey), contents,
123                          version=SDMF_VERSION):
124         """Call this to create a brand-new mutable file. It will create the
125         shares, find homes for them, and upload the initial contents (created
126         with the same rules as IClient.create_mutable_file() ). Returns a
127         Deferred that fires (with the MutableFileNode instance you should
128         use) when it completes.
129         """
130         self._pubkey, self._privkey = pubkey, privkey
131         pubkey_s = self._pubkey.serialize()
132         privkey_s = self._privkey.serialize()
133         self._writekey = hashutil.ssk_writekey_hash(privkey_s)
134         self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
135         self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
136         if version == MDMF_VERSION:
137             self._uri = WriteableMDMFFileURI(self._writekey, self._fingerprint)
138             self._protocol_version = version
139         elif version == SDMF_VERSION:
140             self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
141             self._protocol_version = version
142         self._readkey = self._uri.readkey
143         self._storage_index = self._uri.storage_index
144         initial_contents = self._get_initial_contents(contents)
145         return self._upload(initial_contents, None)
146
147     def _get_initial_contents(self, contents):
148         if contents is None:
149             return MutableData("")
150
151         if isinstance(contents, str):
152             return MutableData(contents)
153
154         if IMutableUploadable.providedBy(contents):
155             return contents
156
157         assert callable(contents), "%s should be callable, not %s" % \
158                (contents, type(contents))
159         return contents(self)
160
161     def _encrypt_privkey(self, writekey, privkey):
162         enc = AES(writekey)
163         crypttext = enc.process(privkey)
164         return crypttext
165
166     def _decrypt_privkey(self, enc_privkey):
167         enc = AES(self._writekey)
168         privkey = enc.process(enc_privkey)
169         return privkey
170
171     def _populate_pubkey(self, pubkey):
172         self._pubkey = pubkey
173     def _populate_required_shares(self, required_shares):
174         self._required_shares = required_shares
175     def _populate_total_shares(self, total_shares):
176         self._total_shares = total_shares
177
178     def _populate_privkey(self, privkey):
179         self._privkey = privkey
180     def _populate_encprivkey(self, encprivkey):
181         self._encprivkey = encprivkey
182
183     def get_write_enabler(self, server):
184         seed = server.get_foolscap_write_enabler_seed()
185         assert len(seed) == 20
186         return hashutil.ssk_write_enabler_hash(self._writekey, seed)
187     def get_renewal_secret(self, server):
188         crs = self._secret_holder.get_renewal_secret()
189         frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
190         lease_seed = server.get_lease_seed()
191         assert len(lease_seed) == 20
192         return hashutil.bucket_renewal_secret_hash(frs, lease_seed)
193     def get_cancel_secret(self, server):
194         ccs = self._secret_holder.get_cancel_secret()
195         fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
196         lease_seed = server.get_lease_seed()
197         assert len(lease_seed) == 20
198         return hashutil.bucket_cancel_secret_hash(fcs, lease_seed)
199
200     def get_writekey(self):
201         return self._writekey
202     def get_readkey(self):
203         return self._readkey
204     def get_storage_index(self):
205         return self._storage_index
206     def get_fingerprint(self):
207         return self._fingerprint
208     def get_privkey(self):
209         return self._privkey
210     def get_encprivkey(self):
211         return self._encprivkey
212     def get_pubkey(self):
213         return self._pubkey
214
215     def get_required_shares(self):
216         return self._required_shares
217     def get_total_shares(self):
218         return self._total_shares
219
220     ####################################
221     # IFilesystemNode
222
223     def get_size(self):
224         return self._most_recent_size
225
226     def get_current_size(self):
227         d = self.get_size_of_best_version()
228         d.addCallback(self._stash_size)
229         return d
230
231     def _stash_size(self, size):
232         self._most_recent_size = size
233         return size
234
235     def get_cap(self):
236         return self._uri
237     def get_readcap(self):
238         return self._uri.get_readonly()
239     def get_verify_cap(self):
240         return self._uri.get_verify_cap()
241     def get_repair_cap(self):
242         if self._uri.is_readonly():
243             return None
244         return self._uri
245
246     def get_uri(self):
247         return self._uri.to_string()
248
249     def get_write_uri(self):
250         if self.is_readonly():
251             return None
252         return self._uri.to_string()
253
254     def get_readonly_uri(self):
255         return self._uri.get_readonly().to_string()
256
257     def get_readonly(self):
258         if self.is_readonly():
259             return self
260         ro = MutableFileNode(self._storage_broker, self._secret_holder,
261                              self._default_encoding_parameters, self._history)
262         ro.init_from_cap(self._uri.get_readonly())
263         return ro
264
265     def is_mutable(self):
266         return self._uri.is_mutable()
267
268     def is_readonly(self):
269         return self._uri.is_readonly()
270
271     def is_unknown(self):
272         return False
273
274     def is_allowed_in_immutable_directory(self):
275         return not self._uri.is_mutable()
276
277     def raise_error(self):
278         pass
279
280     def __hash__(self):
281         return hash((self.__class__, self._uri))
282     def __cmp__(self, them):
283         if cmp(type(self), type(them)):
284             return cmp(type(self), type(them))
285         if cmp(self.__class__, them.__class__):
286             return cmp(self.__class__, them.__class__)
287         return cmp(self._uri, them._uri)
288
289
290     #################################
291     # ICheckable
292
293     def check(self, monitor, verify=False, add_lease=False):
294         checker = MutableChecker(self, self._storage_broker,
295                                  self._history, monitor)
296         return checker.check(verify, add_lease)
297
298     def check_and_repair(self, monitor, verify=False, add_lease=False):
299         checker = MutableCheckAndRepairer(self, self._storage_broker,
300                                           self._history, monitor)
301         return checker.check(verify, add_lease)
302
303     #################################
304     # IRepairable
305
306     def repair(self, check_results, force=False, monitor=None):
307         assert ICheckResults(check_results)
308         r = Repairer(self, check_results, self._storage_broker,
309                      self._history, monitor)
310         d = r.start(force)
311         return d
312
313
314     #################################
315     # IFileNode
316
317     def get_best_readable_version(self):
318         """
319         I return a Deferred that fires with a MutableFileVersion
320         representing the best readable version of the file that I
321         represent
322         """
323         return self.get_readable_version()
324
325
326     def get_readable_version(self, servermap=None, version=None):
327         """
328         I return a Deferred that fires with an MutableFileVersion for my
329         version argument, if there is a recoverable file of that version
330         on the grid. If there is no recoverable version, I fire with an
331         UnrecoverableFileError.
332
333         If a servermap is provided, I look in there for the requested
334         version. If no servermap is provided, I create and update a new
335         one.
336
337         If no version is provided, then I return a MutableFileVersion
338         representing the best recoverable version of the file.
339         """
340         d = self._get_version_from_servermap(MODE_READ, servermap, version)
341         def _build_version((servermap, their_version)):
342             assert their_version in servermap.recoverable_versions()
343             assert their_version in servermap.make_versionmap()
344
345             mfv = MutableFileVersion(self,
346                                      servermap,
347                                      their_version,
348                                      self._storage_index,
349                                      self._storage_broker,
350                                      self._readkey,
351                                      history=self._history)
352             assert mfv.is_readonly()
353             mfv.set_downloader_hints(self._downloader_hints)
354             # our caller can use this to download the contents of the
355             # mutable file.
356             return mfv
357         return d.addCallback(_build_version)
358
359
360     def _get_version_from_servermap(self,
361                                     mode,
362                                     servermap=None,
363                                     version=None):
364         """
365         I return a Deferred that fires with (servermap, version).
366
367         This function performs validation and a servermap update. If it
368         returns (servermap, version), the caller can assume that:
369             - servermap was last updated in mode.
370             - version is recoverable, and corresponds to the servermap.
371
372         If version and servermap are provided to me, I will validate
373         that version exists in the servermap, and that the servermap was
374         updated correctly.
375
376         If version is not provided, but servermap is, I will validate
377         the servermap and return the best recoverable version that I can
378         find in the servermap.
379
380         If the version is provided but the servermap isn't, I will
381         obtain a servermap that has been updated in the correct mode and
382         validate that version is found and recoverable.
383
384         If neither servermap nor version are provided, I will obtain a
385         servermap updated in the correct mode, and return the best
386         recoverable version that I can find in there.
387         """
388         # XXX: wording ^^^^
389         if servermap and servermap.get_last_update()[0] == mode:
390             d = defer.succeed(servermap)
391         else:
392             d = self._get_servermap(mode)
393
394         def _get_version(servermap, v):
395             if v and v not in servermap.recoverable_versions():
396                 v = None
397             elif not v:
398                 v = servermap.best_recoverable_version()
399             if not v:
400                 raise UnrecoverableFileError("no recoverable versions")
401
402             return (servermap, v)
403         return d.addCallback(_get_version, version)
404
405
406     def download_best_version(self, progress=None):
407         """
408         I return a Deferred that fires with the contents of the best
409         version of this mutable file.
410         """
411         return self._do_serialized(self._download_best_version, progress=progress)
412
413
414     def _download_best_version(self, progress=None):
415         """
416         I am the serialized sibling of download_best_version.
417         """
418         d = self.get_best_readable_version()
419         d.addCallback(self._record_size)
420         d.addCallback(lambda version: version.download_to_data(progress=progress))
421
422         # It is possible that the download will fail because there
423         # aren't enough shares to be had. If so, we will try again after
424         # updating the servermap in MODE_WRITE, which may find more
425         # shares than updating in MODE_READ, as we just did. We can do
426         # this by getting the best mutable version and downloading from
427         # that -- the best mutable version will be a MutableFileVersion
428         # with a servermap that was last updated in MODE_WRITE, as we
429         # want. If this fails, then we give up.
430         def _maybe_retry(failure):
431             failure.trap(NotEnoughSharesError)
432
433             d = self.get_best_mutable_version()
434             d.addCallback(self._record_size)
435             d.addCallback(lambda version: version.download_to_data(progress=progress))
436             return d
437
438         d.addErrback(_maybe_retry)
439         return d
440
441
442     def _record_size(self, mfv):
443         """
444         I record the size of a mutable file version.
445         """
446         self._most_recent_size = mfv.get_size()
447         return mfv
448
449
450     def get_size_of_best_version(self):
451         """
452         I return the size of the best version of this mutable file.
453
454         This is equivalent to calling get_size() on the result of
455         get_best_readable_version().
456         """
457         d = self.get_best_readable_version()
458         return d.addCallback(lambda mfv: mfv.get_size())
459
460
461     #################################
462     # IMutableFileNode
463
464     def get_best_mutable_version(self, servermap=None):
465         """
466         I return a Deferred that fires with a MutableFileVersion
467         representing the best readable version of the file that I
468         represent. I am like get_best_readable_version, except that I
469         will try to make a writeable version if I can.
470         """
471         return self.get_mutable_version(servermap=servermap)
472
473
474     def get_mutable_version(self, servermap=None, version=None):
475         """
476         I return a version of this mutable file. I return a Deferred
477         that fires with a MutableFileVersion
478
479         If version is provided, the Deferred will fire with a
480         MutableFileVersion initailized with that version. Otherwise, it
481         will fire with the best version that I can recover.
482
483         If servermap is provided, I will use that to find versions
484         instead of performing my own servermap update.
485         """
486         if self.is_readonly():
487             return self.get_readable_version(servermap=servermap,
488                                              version=version)
489
490         # get_mutable_version => write intent, so we require that the
491         # servermap is updated in MODE_WRITE
492         d = self._get_version_from_servermap(MODE_WRITE, servermap, version)
493         def _build_version((servermap, smap_version)):
494             # these should have been set by the servermap update.
495             assert self._secret_holder
496             assert self._writekey
497
498             mfv = MutableFileVersion(self,
499                                      servermap,
500                                      smap_version,
501                                      self._storage_index,
502                                      self._storage_broker,
503                                      self._readkey,
504                                      self._writekey,
505                                      self._secret_holder,
506                                      history=self._history)
507             assert not mfv.is_readonly()
508             mfv.set_downloader_hints(self._downloader_hints)
509             return mfv
510
511         return d.addCallback(_build_version)
512
513
514     # XXX: I'm uncomfortable with the difference between upload and
515     #      overwrite, which, FWICT, is basically that you don't have to
516     #      do a servermap update before you overwrite. We split them up
517     #      that way anyway, so I guess there's no real difficulty in
518     #      offering both ways to callers, but it also makes the
519     #      public-facing API cluttery, and makes it hard to discern the
520     #      right way of doing things.
521
522     # In general, we leave it to callers to ensure that they aren't
523     # going to cause UncoordinatedWriteErrors when working with
524     # MutableFileVersions. We know that the next three operations
525     # (upload, overwrite, and modify) will all operate on the same
526     # version, so we say that only one of them can be going on at once,
527     # and serialize them to ensure that that actually happens, since as
528     # the caller in this situation it is our job to do that.
529     def overwrite(self, new_contents):
530         """
531         I overwrite the contents of the best recoverable version of this
532         mutable file with new_contents. This is equivalent to calling
533         overwrite on the result of get_best_mutable_version with
534         new_contents as an argument. I return a Deferred that eventually
535         fires with the results of my replacement process.
536         """
537         # TODO: Update downloader hints.
538         return self._do_serialized(self._overwrite, new_contents)
539
540
541     def _overwrite(self, new_contents):
542         """
543         I am the serialized sibling of overwrite.
544         """
545         d = self.get_best_mutable_version()
546         d.addCallback(lambda mfv: mfv.overwrite(new_contents))
547         d.addCallback(self._did_upload, new_contents.get_size())
548         return d
549
550
551     def upload(self, new_contents, servermap):
552         """
553         I overwrite the contents of the best recoverable version of this
554         mutable file with new_contents, using servermap instead of
555         creating/updating our own servermap. I return a Deferred that
556         fires with the results of my upload.
557         """
558         # TODO: Update downloader hints
559         return self._do_serialized(self._upload, new_contents, servermap)
560
561
562     def modify(self, modifier, backoffer=None):
563         """
564         I modify the contents of the best recoverable version of this
565         mutable file with the modifier. This is equivalent to calling
566         modify on the result of get_best_mutable_version. I return a
567         Deferred that eventually fires with an UploadResults instance
568         describing this process.
569         """
570         # TODO: Update downloader hints.
571         return self._do_serialized(self._modify, modifier, backoffer)
572
573
574     def _modify(self, modifier, backoffer):
575         """
576         I am the serialized sibling of modify.
577         """
578         d = self.get_best_mutable_version()
579         d.addCallback(lambda mfv: mfv.modify(modifier, backoffer))
580         return d
581
582
583     def download_version(self, servermap, version, fetch_privkey=False):
584         """
585         Download the specified version of this mutable file. I return a
586         Deferred that fires with the contents of the specified version
587         as a bytestring, or errbacks if the file is not recoverable.
588         """
589         d = self.get_readable_version(servermap, version)
590         return d.addCallback(lambda mfv: mfv.download_to_data(fetch_privkey))
591
592
593     def get_servermap(self, mode):
594         """
595         I return a servermap that has been updated in mode.
596
597         mode should be one of MODE_READ, MODE_WRITE, MODE_CHECK or
598         MODE_ANYTHING. See servermap.py for more on what these mean.
599         """
600         return self._do_serialized(self._get_servermap, mode)
601
602
603     def _get_servermap(self, mode):
604         """
605         I am a serialized twin to get_servermap.
606         """
607         servermap = ServerMap()
608         d = self._update_servermap(servermap, mode)
609         # The servermap will tell us about the most recent size of the
610         # file, so we may as well set that so that callers might get
611         # more data about us.
612         if not self._most_recent_size:
613             d.addCallback(self._get_size_from_servermap)
614         return d
615
616
617     def _get_size_from_servermap(self, servermap):
618         """
619         I extract the size of the best version of this file and record
620         it in self._most_recent_size. I return the servermap that I was
621         given.
622         """
623         if servermap.recoverable_versions():
624             v = servermap.best_recoverable_version()
625             size = v[4] # verinfo[4] == size
626             self._most_recent_size = size
627         return servermap
628
629
630     def _update_servermap(self, servermap, mode):
631         u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
632                              mode)
633         if self._history:
634             self._history.notify_mapupdate(u.get_status())
635         return u.update()
636
637
638     #def set_version(self, version):
639         # I can be set in two ways:
640         #  1. When the node is created.
641         #  2. (for an existing share) when the Servermap is updated
642         #     before I am read.
643     #    assert version in (MDMF_VERSION, SDMF_VERSION)
644     #    self._protocol_version = version
645
646
647     def get_version(self):
648         return self._protocol_version
649
650
651     def _do_serialized(self, cb, *args, **kwargs):
652         # note: to avoid deadlock, this callable is *not* allowed to invoke
653         # other serialized methods within this (or any other)
654         # MutableFileNode. The callable should be a bound method of this same
655         # MFN instance.
656         d = defer.Deferred()
657         self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
658         # we need to put off d.callback until this Deferred is finished being
659         # processed. Otherwise the caller's subsequent activities (like,
660         # doing other things with this node) can cause reentrancy problems in
661         # the Deferred code itself
662         self._serializer.addBoth(lambda res: eventually(d.callback, res))
663         # add a log.err just in case something really weird happens, because
664         # self._serializer stays around forever, therefore we won't see the
665         # usual Unhandled Error in Deferred that would give us a hint.
666         self._serializer.addErrback(log.err)
667         return d
668
669
670     def _upload(self, new_contents, servermap):
671         """
672         A MutableFileNode still has to have some way of getting
673         published initially, which is what I am here for. After that,
674         all publishing, updating, modifying and so on happens through
675         MutableFileVersions.
676         """
677         assert self._pubkey, "update_servermap must be called before publish"
678
679         # Define IPublishInvoker with a set_downloader_hints method?
680         # Then have the publisher call that method when it's done publishing?
681         p = Publish(self, self._storage_broker, servermap)
682         if self._history:
683             self._history.notify_publish(p.get_status(),
684                                          new_contents.get_size())
685         d = p.publish(new_contents)
686         d.addCallback(self._did_upload, new_contents.get_size())
687         return d
688
689
690     def set_downloader_hints(self, hints):
691         self._downloader_hints = hints
692
693     def _did_upload(self, res, size):
694         self._most_recent_size = size
695         return res
696
697
698 class MutableFileVersion:
699     """
700     I represent a specific version (most likely the best version) of a
701     mutable file.
702
703     Since I implement IReadable, instances which hold a
704     reference to an instance of me are guaranteed the ability (absent
705     connection difficulties or unrecoverable versions) to read the file
706     that I represent. Depending on whether I was initialized with a
707     write capability or not, I may also provide callers the ability to
708     overwrite or modify the contents of the mutable file that I
709     reference.
710     """
711     implements(IMutableFileVersion, IWriteable)
712
713     def __init__(self,
714                  node,
715                  servermap,
716                  version,
717                  storage_index,
718                  storage_broker,
719                  readcap,
720                  writekey=None,
721                  write_secrets=None,
722                  history=None):
723
724         self._node = node
725         self._servermap = servermap
726         self._version = version
727         self._storage_index = storage_index
728         self._write_secrets = write_secrets
729         self._history = history
730         self._storage_broker = storage_broker
731
732         #assert isinstance(readcap, IURI)
733         self._readcap = readcap
734
735         self._writekey = writekey
736         self._serializer = defer.succeed(None)
737
738
739     def get_sequence_number(self):
740         """
741         Get the sequence number of the mutable version that I represent.
742         """
743         return self._version[0] # verinfo[0] == the sequence number
744
745
746     # TODO: Terminology?
747     def get_writekey(self):
748         """
749         I return a writekey or None if I don't have a writekey.
750         """
751         return self._writekey
752
753
754     def set_downloader_hints(self, hints):
755         """
756         I set the downloader hints.
757         """
758         assert isinstance(hints, dict)
759
760         self._downloader_hints = hints
761
762
763     def get_downloader_hints(self):
764         """
765         I return the downloader hints.
766         """
767         return self._downloader_hints
768
769
770     def overwrite(self, new_contents):
771         """
772         I overwrite the contents of this mutable file version with the
773         data in new_contents.
774         """
775         assert not self.is_readonly()
776
777         return self._do_serialized(self._overwrite, new_contents)
778
779
780     def _overwrite(self, new_contents):
781         assert IMutableUploadable.providedBy(new_contents)
782         assert self._servermap.get_last_update()[0] == MODE_WRITE
783
784         return self._upload(new_contents)
785
786
787     def modify(self, modifier, backoffer=None):
788         """I use a modifier callback to apply a change to the mutable file.
789         I implement the following pseudocode::
790
791          obtain_mutable_filenode_lock()
792          first_time = True
793          while True:
794            update_servermap(MODE_WRITE)
795            old = retrieve_best_version()
796            new = modifier(old, servermap, first_time)
797            first_time = False
798            if new == old: break
799            try:
800              publish(new)
801            except UncoordinatedWriteError, e:
802              backoffer(e)
803              continue
804            break
805          release_mutable_filenode_lock()
806
807         The idea is that your modifier function can apply a delta of some
808         sort, and it will be re-run as necessary until it succeeds. The
809         modifier must inspect the old version to see whether its delta has
810         already been applied: if so it should return the contents unmodified.
811
812         Note that the modifier is required to run synchronously, and must not
813         invoke any methods on this MutableFileNode instance.
814
815         The backoff-er is a callable that is responsible for inserting a
816         random delay between subsequent attempts, to help competing updates
817         from colliding forever. It is also allowed to give up after a while.
818         The backoffer is given two arguments: this MutableFileNode, and the
819         Failure object that contains the UncoordinatedWriteError. It should
820         return a Deferred that will fire when the next attempt should be
821         made, or return the Failure if the loop should give up. If
822         backoffer=None, a default one is provided which will perform
823         exponential backoff, and give up after 4 tries. Note that the
824         backoffer should not invoke any methods on this MutableFileNode
825         instance, and it needs to be highly conscious of deadlock issues.
826         """
827         assert not self.is_readonly()
828
829         return self._do_serialized(self._modify, modifier, backoffer)
830
831
832     def _modify(self, modifier, backoffer):
833         if backoffer is None:
834             backoffer = BackoffAgent().delay
835         return self._modify_and_retry(modifier, backoffer, True)
836
837
838     def _modify_and_retry(self, modifier, backoffer, first_time):
839         """
840         I try to apply modifier to the contents of this version of the
841         mutable file. If I succeed, I return an UploadResults instance
842         describing my success. If I fail, I try again after waiting for
843         a little bit.
844         """
845         log.msg("doing modify")
846         if first_time:
847             d = self._update_servermap()
848         else:
849             # We ran into trouble; do MODE_CHECK so we're a little more
850             # careful on subsequent tries.
851             d = self._update_servermap(mode=MODE_CHECK)
852
853         d.addCallback(lambda ignored:
854             self._modify_once(modifier, first_time))
855         def _retry(f):
856             f.trap(UncoordinatedWriteError)
857             # Uh oh, it broke. We're allowed to trust the servermap for our
858             # first try, but after that we need to update it. It's
859             # possible that we've failed due to a race with another
860             # uploader, and if the race is to converge correctly, we
861             # need to know about that upload.
862             d2 = defer.maybeDeferred(backoffer, self, f)
863             d2.addCallback(lambda ignored:
864                            self._modify_and_retry(modifier,
865                                                   backoffer, False))
866             return d2
867         d.addErrback(_retry)
868         return d
869
870
871     def _modify_once(self, modifier, first_time):
872         """
873         I attempt to apply a modifier to the contents of the mutable
874         file.
875         """
876         assert self._servermap.get_last_update()[0] != MODE_READ
877
878         # download_to_data is serialized, so we have to call this to
879         # avoid deadlock.
880         d = self._try_to_download_data()
881         def _apply(old_contents):
882             new_contents = modifier(old_contents, self._servermap, first_time)
883             precondition((isinstance(new_contents, str) or
884                           new_contents is None),
885                          "Modifier function must return a string "
886                          "or None")
887
888             if new_contents is None or new_contents == old_contents:
889                 log.msg("no changes")
890                 # no changes need to be made
891                 if first_time:
892                     return
893                 # However, since Publish is not automatically doing a
894                 # recovery when it observes UCWE, we need to do a second
895                 # publish. See #551 for details. We'll basically loop until
896                 # we managed an uncontested publish.
897                 old_uploadable = MutableData(old_contents)
898                 new_contents = old_uploadable
899             else:
900                 new_contents = MutableData(new_contents)
901
902             return self._upload(new_contents)
903         d.addCallback(_apply)
904         return d
905
906
907     def is_readonly(self):
908         """
909         I return True if this MutableFileVersion provides no write
910         access to the file that it encapsulates, and False if it
911         provides the ability to modify the file.
912         """
913         return self._writekey is None
914
915
916     def is_mutable(self):
917         """
918         I return True, since mutable files are always mutable by
919         somebody.
920         """
921         return True
922
923
924     def get_storage_index(self):
925         """
926         I return the storage index of the reference that I encapsulate.
927         """
928         return self._storage_index
929
930
931     def get_size(self):
932         """
933         I return the length, in bytes, of this readable object.
934         """
935         return self._servermap.size_of_version(self._version)
936
937
938     def download_to_data(self, fetch_privkey=False, progress=None):
939         """
940         I return a Deferred that fires with the contents of this
941         readable object as a byte string.
942
943         """
944         c = consumer.MemoryConsumer(progress=progress)
945         d = self.read(c, fetch_privkey=fetch_privkey)
946         d.addCallback(lambda mc: "".join(mc.chunks))
947         return d
948
949
950     def _try_to_download_data(self):
951         """
952         I am an unserialized cousin of download_to_data; I am called
953         from the children of modify() to download the data associated
954         with this mutable version.
955         """
956         c = consumer.MemoryConsumer()
957         # modify will almost certainly write, so we need the privkey.
958         d = self._read(c, fetch_privkey=True)
959         d.addCallback(lambda mc: "".join(mc.chunks))
960         return d
961
962
963     def read(self, consumer, offset=0, size=None, fetch_privkey=False):
964         """
965         I read a portion (possibly all) of the mutable file that I
966         reference into consumer.
967         """
968         return self._do_serialized(self._read, consumer, offset, size,
969                                    fetch_privkey)
970
971
972     def _read(self, consumer, offset=0, size=None, fetch_privkey=False):
973         """
974         I am the serialized companion of read.
975         """
976         r = Retrieve(self._node, self._storage_broker, self._servermap,
977                      self._version, fetch_privkey)
978         if self._history:
979             self._history.notify_retrieve(r.get_status())
980         d = r.download(consumer, offset, size)
981         return d
982
983
984     def _do_serialized(self, cb, *args, **kwargs):
985         # note: to avoid deadlock, this callable is *not* allowed to invoke
986         # other serialized methods within this (or any other)
987         # MutableFileNode. The callable should be a bound method of this same
988         # MFN instance.
989         d = defer.Deferred()
990         self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
991         # we need to put off d.callback until this Deferred is finished being
992         # processed. Otherwise the caller's subsequent activities (like,
993         # doing other things with this node) can cause reentrancy problems in
994         # the Deferred code itself
995         self._serializer.addBoth(lambda res: eventually(d.callback, res))
996         # add a log.err just in case something really weird happens, because
997         # self._serializer stays around forever, therefore we won't see the
998         # usual Unhandled Error in Deferred that would give us a hint.
999         self._serializer.addErrback(log.err)
1000         return d
1001
1002
1003     def _upload(self, new_contents):
1004         #assert self._pubkey, "update_servermap must be called before publish"
1005         p = Publish(self._node, self._storage_broker, self._servermap)
1006         if self._history:
1007             self._history.notify_publish(p.get_status(),
1008                                          new_contents.get_size())
1009         d = p.publish(new_contents)
1010         d.addCallback(self._did_upload, new_contents.get_size())
1011         return d
1012
1013
1014     def _did_upload(self, res, size):
1015         self._most_recent_size = size
1016         return res
1017
1018     def update(self, data, offset):
1019         """
1020         Do an update of this mutable file version by inserting data at
1021         offset within the file. If offset is the EOF, this is an append
1022         operation. I return a Deferred that fires with the results of
1023         the update operation when it has completed.
1024
1025         In cases where update does not append any data, or where it does
1026         not append so many blocks that the block count crosses a
1027         power-of-two boundary, this operation will use roughly
1028         O(data.get_size()) memory/bandwidth/CPU to perform the update.
1029         Otherwise, it must download, re-encode, and upload the entire
1030         file again, which will use O(filesize) resources.
1031         """
1032         return self._do_serialized(self._update, data, offset)
1033
1034
1035     def _update(self, data, offset):
1036         """
1037         I update the mutable file version represented by this particular
1038         IMutableVersion by inserting the data in data at the offset
1039         offset. I return a Deferred that fires when this has been
1040         completed.
1041         """
1042         new_size = data.get_size() + offset
1043         old_size = self.get_size()
1044         segment_size = self._version[3]
1045         num_old_segments = mathutil.div_ceil(old_size,
1046                                              segment_size)
1047         num_new_segments = mathutil.div_ceil(new_size,
1048                                              segment_size)
1049         log.msg("got %d old segments, %d new segments" % \
1050                         (num_old_segments, num_new_segments))
1051
1052         # We do a whole file re-encode if the file is an SDMF file.
1053         if self._version[2]: # version[2] == SDMF salt, which MDMF lacks
1054             log.msg("doing re-encode instead of in-place update")
1055             return self._do_modify_update(data, offset)
1056
1057         # Otherwise, we can replace just the parts that are changing.
1058         log.msg("updating in place")
1059         d = self._do_update_update(data, offset)
1060         d.addCallback(self._decode_and_decrypt_segments, data, offset)
1061         d.addCallback(self._build_uploadable_and_finish, data, offset)
1062         return d
1063
1064
1065     def _do_modify_update(self, data, offset):
1066         """
1067         I perform a file update by modifying the contents of the file
1068         after downloading it, then reuploading it. I am less efficient
1069         than _do_update_update, but am necessary for certain updates.
1070         """
1071         def m(old, servermap, first_time):
1072             start = offset
1073             rest = offset + data.get_size()
1074             new = old[:start]
1075             new += "".join(data.read(data.get_size()))
1076             new += old[rest:]
1077             return new
1078         return self._modify(m, None)
1079
1080
1081     def _do_update_update(self, data, offset):
1082         """
1083         I start the Servermap update that gets us the data we need to
1084         continue the update process. I return a Deferred that fires when
1085         the servermap update is done.
1086         """
1087         assert IMutableUploadable.providedBy(data)
1088         assert self.is_mutable()
1089         # offset == self.get_size() is valid and means that we are
1090         # appending data to the file.
1091         assert offset <= self.get_size()
1092
1093         segsize = self._version[3]
1094         # We'll need the segment that the data starts in, regardless of
1095         # what we'll do later.
1096         start_segment = offset // segsize
1097
1098         # We only need the end segment if the data we append does not go
1099         # beyond the current end-of-file.
1100         end_segment = start_segment
1101         if offset + data.get_size() < self.get_size():
1102             end_data = offset + data.get_size()
1103             # The last byte we touch is the end_data'th byte, which is actually
1104             # byte end_data - 1 because bytes are zero-indexed.
1105             end_data -= 1
1106             end_segment = end_data // segsize
1107
1108         self._start_segment = start_segment
1109         self._end_segment = end_segment
1110
1111         # Now ask for the servermap to be updated in MODE_WRITE with
1112         # this update range.
1113         return self._update_servermap(update_range=(start_segment,
1114                                                     end_segment))
1115
1116
1117     def _decode_and_decrypt_segments(self, ignored, data, offset):
1118         """
1119         After the servermap update, I take the encrypted and encoded
1120         data that the servermap fetched while doing its update and
1121         transform it into decoded-and-decrypted plaintext that can be
1122         used by the new uploadable. I return a Deferred that fires with
1123         the segments.
1124         """
1125         r = Retrieve(self._node, self._storage_broker, self._servermap,
1126                      self._version)
1127         # decode: takes in our blocks and salts from the servermap,
1128         # returns a Deferred that fires with the corresponding plaintext
1129         # segments. Does not download -- simply takes advantage of
1130         # existing infrastructure within the Retrieve class to avoid
1131         # duplicating code.
1132         sm = self._servermap
1133         # XXX: If the methods in the servermap don't work as
1134         # abstractions, you should rewrite them instead of going around
1135         # them.
1136         update_data = sm.update_data
1137         start_segments = {} # shnum -> start segment
1138         end_segments = {} # shnum -> end segment
1139         blockhashes = {} # shnum -> blockhash tree
1140         for (shnum, original_data) in update_data.iteritems():
1141             data = [d[1] for d in original_data if d[0] == self._version]
1142             # data is [(blockhashes,start,end)..]
1143
1144             # Every data entry in our list should now be share shnum for
1145             # a particular version of the mutable file, so all of the
1146             # entries should be identical.
1147             datum = data[0]
1148             assert [x for x in data if x != datum] == []
1149
1150             # datum is (blockhashes,start,end)
1151             blockhashes[shnum] = datum[0]
1152             start_segments[shnum] = datum[1] # (block,salt) bytestrings
1153             end_segments[shnum] = datum[2]
1154
1155         d1 = r.decode(start_segments, self._start_segment)
1156         d2 = r.decode(end_segments, self._end_segment)
1157         d3 = defer.succeed(blockhashes)
1158         return deferredutil.gatherResults([d1, d2, d3])
1159
1160
1161     def _build_uploadable_and_finish(self, segments_and_bht, data, offset):
1162         """
1163         After the process has the plaintext segments, I build the
1164         TransformingUploadable that the publisher will eventually
1165         re-upload to the grid. I then invoke the publisher with that
1166         uploadable, and return a Deferred when the publish operation has
1167         completed without issue.
1168         """
1169         u = TransformingUploadable(data, offset,
1170                                    self._version[3],
1171                                    segments_and_bht[0],
1172                                    segments_and_bht[1])
1173         p = Publish(self._node, self._storage_broker, self._servermap)
1174         return p.update(u, offset, segments_and_bht[2], self._version)
1175
1176
1177     def _update_servermap(self, mode=MODE_WRITE, update_range=None):
1178         """
1179         I update the servermap. I return a Deferred that fires when the
1180         servermap update is done.
1181         """
1182         if update_range:
1183             u = ServermapUpdater(self._node, self._storage_broker, Monitor(),
1184                                  self._servermap,
1185                                  mode=mode,
1186                                  update_range=update_range)
1187         else:
1188             u = ServermapUpdater(self._node, self._storage_broker, Monitor(),
1189                                  self._servermap,
1190                                  mode=mode)
1191         return u.update()