]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/filenode.py
clean up uri-vs-cap terminology, emphasize cap instances instead of URI strings
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / filenode.py
1
2 import random
3
4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
8      ICheckable, ICheckResults, NotEnoughSharesError
9 from allmydata.util import hashutil, log
10 from allmydata.util.assertutil import precondition
11 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI
12 from allmydata.monitor import Monitor
13 from pycryptopp.cipher.aes import AES
14
15 from publish import Publish
16 from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
17      ResponseCache, UncoordinatedWriteError
18 from servermap import ServerMap, ServermapUpdater
19 from retrieve import Retrieve
20 from checker import MutableChecker, MutableCheckAndRepairer
21 from repairer import Repairer
22
23
24 class BackoffAgent:
25     # these parameters are copied from foolscap.reconnector, which gets them
26     # from twisted.internet.protocol.ReconnectingClientFactory
27     initialDelay = 1.0
28     factor = 2.7182818284590451 # (math.e)
29     jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
30     maxRetries = 4
31
32     def __init__(self):
33         self._delay = self.initialDelay
34         self._count = 0
35     def delay(self, node, f):
36         self._count += 1
37         if self._count == 4:
38             return f
39         self._delay = self._delay * self.factor
40         self._delay = random.normalvariate(self._delay,
41                                            self._delay * self.jitter)
42         d = defer.Deferred()
43         reactor.callLater(self._delay, d.callback, None)
44         return d
45
46 # use nodemaker.create_mutable_file() to make one of these
47
48 class MutableFileNode:
49     implements(IMutableFileNode, ICheckable)
50
51     def __init__(self, storage_broker, secret_holder,
52                  default_encoding_parameters, history):
53         self._storage_broker = storage_broker
54         self._secret_holder = secret_holder
55         self._default_encoding_parameters = default_encoding_parameters
56         self._history = history
57         self._pubkey = None # filled in upon first read
58         self._privkey = None # filled in if we're mutable
59         # we keep track of the last encoding parameters that we use. These
60         # are updated upon retrieve, and used by publish. If we publish
61         # without ever reading (i.e. overwrite()), then we use these values.
62         self._required_shares = default_encoding_parameters["k"]
63         self._total_shares = default_encoding_parameters["n"]
64         self._sharemap = {} # known shares, shnum-to-[nodeids]
65         self._cache = ResponseCache()
66
67         # all users of this MutableFileNode go through the serializer. This
68         # takes advantage of the fact that Deferreds discard the callbacks
69         # that they're done with, so we can keep using the same Deferred
70         # forever without consuming more and more memory.
71         self._serializer = defer.succeed(None)
72
73     def __repr__(self):
74         if hasattr(self, '_uri'):
75             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
76         else:
77             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
78
79     def init_from_cap(self, filecap):
80         # we have the URI, but we have not yet retrieved the public
81         # verification key, nor things like 'k' or 'N'. If and when someone
82         # wants to get our contents, we'll pull from shares and fill those
83         # in.
84         assert isinstance(filecap, (ReadonlySSKFileURI, WriteableSSKFileURI))
85         self._uri = filecap
86         self._writekey = None
87         if isinstance(filecap, WriteableSSKFileURI):
88             self._writekey = self._uri.writekey
89         self._readkey = self._uri.readkey
90         self._storage_index = self._uri.storage_index
91         self._fingerprint = self._uri.fingerprint
92         # the following values are learned during Retrieval
93         #  self._pubkey
94         #  self._required_shares
95         #  self._total_shares
96         # and these are needed for Publish. They are filled in by Retrieval
97         # if possible, otherwise by the first peer that Publish talks to.
98         self._privkey = None
99         self._encprivkey = None
100         return self
101
102     def create_with_keys(self, (pubkey, privkey), contents):
103         """Call this to create a brand-new mutable file. It will create the
104         shares, find homes for them, and upload the initial contents (created
105         with the same rules as IClient.create_mutable_file() ). Returns a
106         Deferred that fires (with the MutableFileNode instance you should
107         use) when it completes.
108         """
109         self._pubkey, self._privkey = pubkey, privkey
110         pubkey_s = self._pubkey.serialize()
111         privkey_s = self._privkey.serialize()
112         self._writekey = hashutil.ssk_writekey_hash(privkey_s)
113         self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
114         self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
115         self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
116         self._readkey = self._uri.readkey
117         self._storage_index = self._uri.storage_index
118         initial_contents = self._get_initial_contents(contents)
119         return self._upload(initial_contents, None)
120
121     def _get_initial_contents(self, contents):
122         if isinstance(contents, str):
123             return contents
124         if contents is None:
125             return ""
126         assert callable(contents), "%s should be callable, not %s" % \
127                (contents, type(contents))
128         return contents(self)
129
130     def _encrypt_privkey(self, writekey, privkey):
131         enc = AES(writekey)
132         crypttext = enc.process(privkey)
133         return crypttext
134
135     def _decrypt_privkey(self, enc_privkey):
136         enc = AES(self._writekey)
137         privkey = enc.process(enc_privkey)
138         return privkey
139
140     def _populate_pubkey(self, pubkey):
141         self._pubkey = pubkey
142     def _populate_required_shares(self, required_shares):
143         self._required_shares = required_shares
144     def _populate_total_shares(self, total_shares):
145         self._total_shares = total_shares
146
147     def _populate_privkey(self, privkey):
148         self._privkey = privkey
149     def _populate_encprivkey(self, encprivkey):
150         self._encprivkey = encprivkey
151
152
153     def get_write_enabler(self, peerid):
154         assert len(peerid) == 20
155         return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
156     def get_renewal_secret(self, peerid):
157         assert len(peerid) == 20
158         crs = self._secret_holder.get_renewal_secret()
159         frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
160         return hashutil.bucket_renewal_secret_hash(frs, peerid)
161     def get_cancel_secret(self, peerid):
162         assert len(peerid) == 20
163         ccs = self._secret_holder.get_cancel_secret()
164         fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
165         return hashutil.bucket_cancel_secret_hash(fcs, peerid)
166
167     def get_writekey(self):
168         return self._writekey
169     def get_readkey(self):
170         return self._readkey
171     def get_storage_index(self):
172         return self._storage_index
173     def get_privkey(self):
174         return self._privkey
175     def get_encprivkey(self):
176         return self._encprivkey
177     def get_pubkey(self):
178         return self._pubkey
179
180     def get_required_shares(self):
181         return self._required_shares
182     def get_total_shares(self):
183         return self._total_shares
184
185     ####################################
186     # IFilesystemNode
187
188     def get_size(self):
189         return "?" # TODO: this is likely to cause problems, not being an int
190
191     def get_cap(self):
192         return self._uri
193     def get_readcap(self):
194         return self._uri.get_readonly()
195     def get_verify_cap(self):
196         return IMutableFileURI(self._uri).get_verify_cap()
197     def get_repair_cap(self):
198         if self._uri.is_readonly():
199             return None
200         return self._uri
201
202     def get_uri(self):
203         return self._uri.to_string()
204     def get_readonly_uri(self):
205         return self._uri.get_readonly().to_string()
206
207     def get_readonly(self):
208         if self.is_readonly():
209             return self
210         ro = MutableFileNode(self._storage_broker, self._secret_holder,
211                              self._default_encoding_parameters, self._history)
212         ro.init_from_cap(self._uri.get_readonly())
213         return ro
214
215     def is_mutable(self):
216         return self._uri.is_mutable()
217     def is_readonly(self):
218         return self._uri.is_readonly()
219
220     def __hash__(self):
221         return hash((self.__class__, self._uri))
222     def __cmp__(self, them):
223         if cmp(type(self), type(them)):
224             return cmp(type(self), type(them))
225         if cmp(self.__class__, them.__class__):
226             return cmp(self.__class__, them.__class__)
227         return cmp(self._uri, them._uri)
228
229     def _do_serialized(self, cb, *args, **kwargs):
230         # note: to avoid deadlock, this callable is *not* allowed to invoke
231         # other serialized methods within this (or any other)
232         # MutableFileNode. The callable should be a bound method of this same
233         # MFN instance.
234         d = defer.Deferred()
235         self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
236         # we need to put off d.callback until this Deferred is finished being
237         # processed. Otherwise the caller's subsequent activities (like,
238         # doing other things with this node) can cause reentrancy problems in
239         # the Deferred code itself
240         self._serializer.addBoth(lambda res: eventually(d.callback, res))
241         # add a log.err just in case something really weird happens, because
242         # self._serializer stays around forever, therefore we won't see the
243         # usual Unhandled Error in Deferred that would give us a hint.
244         self._serializer.addErrback(log.err)
245         return d
246
247     #################################
248     # ICheckable
249
250     def check(self, monitor, verify=False, add_lease=False):
251         checker = MutableChecker(self, self._storage_broker,
252                                  self._history, monitor)
253         return checker.check(verify, add_lease)
254
255     def check_and_repair(self, monitor, verify=False, add_lease=False):
256         checker = MutableCheckAndRepairer(self, self._storage_broker,
257                                           self._history, monitor)
258         return checker.check(verify, add_lease)
259
260     #################################
261     # IRepairable
262
263     def repair(self, check_results, force=False):
264         assert ICheckResults(check_results)
265         r = Repairer(self, check_results)
266         d = r.start(force)
267         return d
268
269
270     #################################
271     # IMutableFileNode
272
273     # allow the use of IDownloadTarget
274     def download(self, target):
275         # fake it. TODO: make this cleaner.
276         d = self.download_best_version()
277         def _done(data):
278             target.open(len(data))
279             target.write(data)
280             target.close()
281             return target.finish()
282         d.addCallback(_done)
283         return d
284
285
286     # new API
287
288     def download_best_version(self):
289         return self._do_serialized(self._download_best_version)
290     def _download_best_version(self):
291         servermap = ServerMap()
292         d = self._try_once_to_download_best_version(servermap, MODE_READ)
293         def _maybe_retry(f):
294             f.trap(NotEnoughSharesError)
295             # the download is worth retrying once. Make sure to use the
296             # old servermap, since it is what remembers the bad shares,
297             # but use MODE_WRITE to make it look for even more shares.
298             # TODO: consider allowing this to retry multiple times.. this
299             # approach will let us tolerate about 8 bad shares, I think.
300             return self._try_once_to_download_best_version(servermap,
301                                                            MODE_WRITE)
302         d.addErrback(_maybe_retry)
303         return d
304     def _try_once_to_download_best_version(self, servermap, mode):
305         d = self._update_servermap(servermap, mode)
306         d.addCallback(self._once_updated_download_best_version, servermap)
307         return d
308     def _once_updated_download_best_version(self, ignored, servermap):
309         goal = servermap.best_recoverable_version()
310         if not goal:
311             raise UnrecoverableFileError("no recoverable versions")
312         return self._try_once_to_download_version(servermap, goal)
313
314     def get_size_of_best_version(self):
315         d = self.get_servermap(MODE_READ)
316         def _got_servermap(smap):
317             ver = smap.best_recoverable_version()
318             if not ver:
319                 raise UnrecoverableFileError("no recoverable version")
320             return smap.size_of_version(ver)
321         d.addCallback(_got_servermap)
322         return d
323
324     def overwrite(self, new_contents):
325         return self._do_serialized(self._overwrite, new_contents)
326     def _overwrite(self, new_contents):
327         servermap = ServerMap()
328         d = self._update_servermap(servermap, mode=MODE_WRITE)
329         d.addCallback(lambda ignored: self._upload(new_contents, servermap))
330         return d
331
332
333     def modify(self, modifier, backoffer=None):
334         """I use a modifier callback to apply a change to the mutable file.
335         I implement the following pseudocode::
336
337          obtain_mutable_filenode_lock()
338          first_time = True
339          while True:
340            update_servermap(MODE_WRITE)
341            old = retrieve_best_version()
342            new = modifier(old, servermap, first_time)
343            first_time = False
344            if new == old: break
345            try:
346              publish(new)
347            except UncoordinatedWriteError, e:
348              backoffer(e)
349              continue
350            break
351          release_mutable_filenode_lock()
352
353         The idea is that your modifier function can apply a delta of some
354         sort, and it will be re-run as necessary until it succeeds. The
355         modifier must inspect the old version to see whether its delta has
356         already been applied: if so it should return the contents unmodified.
357
358         Note that the modifier is required to run synchronously, and must not
359         invoke any methods on this MutableFileNode instance.
360
361         The backoff-er is a callable that is responsible for inserting a
362         random delay between subsequent attempts, to help competing updates
363         from colliding forever. It is also allowed to give up after a while.
364         The backoffer is given two arguments: this MutableFileNode, and the
365         Failure object that contains the UncoordinatedWriteError. It should
366         return a Deferred that will fire when the next attempt should be
367         made, or return the Failure if the loop should give up. If
368         backoffer=None, a default one is provided which will perform
369         exponential backoff, and give up after 4 tries. Note that the
370         backoffer should not invoke any methods on this MutableFileNode
371         instance, and it needs to be highly conscious of deadlock issues.
372         """
373         return self._do_serialized(self._modify, modifier, backoffer)
374     def _modify(self, modifier, backoffer):
375         servermap = ServerMap()
376         if backoffer is None:
377             backoffer = BackoffAgent().delay
378         return self._modify_and_retry(servermap, modifier, backoffer, True)
379     def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
380         d = self._modify_once(servermap, modifier, first_time)
381         def _retry(f):
382             f.trap(UncoordinatedWriteError)
383             d2 = defer.maybeDeferred(backoffer, self, f)
384             d2.addCallback(lambda ignored:
385                            self._modify_and_retry(servermap, modifier,
386                                                   backoffer, False))
387             return d2
388         d.addErrback(_retry)
389         return d
390     def _modify_once(self, servermap, modifier, first_time):
391         d = self._update_servermap(servermap, MODE_WRITE)
392         d.addCallback(self._once_updated_download_best_version, servermap)
393         def _apply(old_contents):
394             new_contents = modifier(old_contents, servermap, first_time)
395             if new_contents is None or new_contents == old_contents:
396                 # no changes need to be made
397                 if first_time:
398                     return
399                 # However, since Publish is not automatically doing a
400                 # recovery when it observes UCWE, we need to do a second
401                 # publish. See #551 for details. We'll basically loop until
402                 # we managed an uncontested publish.
403                 new_contents = old_contents
404             precondition(isinstance(new_contents, str),
405                          "Modifier function must return a string or None")
406             return self._upload(new_contents, servermap)
407         d.addCallback(_apply)
408         return d
409
410     def get_servermap(self, mode):
411         return self._do_serialized(self._get_servermap, mode)
412     def _get_servermap(self, mode):
413         servermap = ServerMap()
414         return self._update_servermap(servermap, mode)
415     def _update_servermap(self, servermap, mode):
416         u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
417                              mode)
418         if self._history:
419             self._history.notify_mapupdate(u.get_status())
420         return u.update()
421
422     def download_version(self, servermap, version, fetch_privkey=False):
423         return self._do_serialized(self._try_once_to_download_version,
424                                    servermap, version, fetch_privkey)
425     def _try_once_to_download_version(self, servermap, version,
426                                       fetch_privkey=False):
427         r = Retrieve(self, servermap, version, fetch_privkey)
428         if self._history:
429             self._history.notify_retrieve(r.get_status())
430         return r.download()
431
432     def upload(self, new_contents, servermap):
433         return self._do_serialized(self._upload, new_contents, servermap)
434     def _upload(self, new_contents, servermap):
435         assert self._pubkey, "update_servermap must be called before publish"
436         p = Publish(self, self._storage_broker, servermap)
437         if self._history:
438             self._history.notify_publish(p.get_status(), len(new_contents))
439         return p.publish(new_contents)