]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/filenode.py
afd71ffd0429c205c55842cba69d6e1a30f9fde9
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / filenode.py
1
2 import random
3
4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
8      ICheckable, ICheckResults, NotEnoughSharesError
9 from allmydata.util import hashutil, log
10 from allmydata.util.assertutil import precondition
11 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI
12 from allmydata.monitor import Monitor
13 from pycryptopp.cipher.aes import AES
14
15 from publish import Publish
16 from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
17      ResponseCache, UncoordinatedWriteError
18 from servermap import ServerMap, ServermapUpdater
19 from retrieve import Retrieve
20 from checker import MutableChecker, MutableCheckAndRepairer
21 from repairer import Repairer
22
23
24 class BackoffAgent:
25     # these parameters are copied from foolscap.reconnector, which gets them
26     # from twisted.internet.protocol.ReconnectingClientFactory
27     initialDelay = 1.0
28     factor = 2.7182818284590451 # (math.e)
29     jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
30     maxRetries = 4
31
32     def __init__(self):
33         self._delay = self.initialDelay
34         self._count = 0
35     def delay(self, node, f):
36         self._count += 1
37         if self._count == 4:
38             return f
39         self._delay = self._delay * self.factor
40         self._delay = random.normalvariate(self._delay,
41                                            self._delay * self.jitter)
42         d = defer.Deferred()
43         reactor.callLater(self._delay, d.callback, None)
44         return d
45
46 # use nodemaker.create_mutable_file() to make one of these
47
48 class MutableFileNode:
49     implements(IMutableFileNode, ICheckable)
50
51     def __init__(self, storage_broker, secret_holder,
52                  default_encoding_parameters, history):
53         self._storage_broker = storage_broker
54         self._secret_holder = secret_holder
55         self._default_encoding_parameters = default_encoding_parameters
56         self._history = history
57         self._pubkey = None # filled in upon first read
58         self._privkey = None # filled in if we're mutable
59         # we keep track of the last encoding parameters that we use. These
60         # are updated upon retrieve, and used by publish. If we publish
61         # without ever reading (i.e. overwrite()), then we use these values.
62         self._required_shares = default_encoding_parameters["k"]
63         self._total_shares = default_encoding_parameters["n"]
64         self._sharemap = {} # known shares, shnum-to-[nodeids]
65         self._cache = ResponseCache()
66
67         # all users of this MutableFileNode go through the serializer. This
68         # takes advantage of the fact that Deferreds discard the callbacks
69         # that they're done with, so we can keep using the same Deferred
70         # forever without consuming more and more memory.
71         self._serializer = defer.succeed(None)
72
73     def __repr__(self):
74         if hasattr(self, '_uri'):
75             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
76         else:
77             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
78
79     def init_from_uri(self, filecap):
80         # we have the URI, but we have not yet retrieved the public
81         # verification key, nor things like 'k' or 'N'. If and when someone
82         # wants to get our contents, we'll pull from shares and fill those
83         # in.
84         assert isinstance(filecap, str)
85         if filecap.startswith("URI:SSK:"):
86             self._uri = WriteableSSKFileURI.init_from_string(filecap)
87             self._writekey = self._uri.writekey
88         else:
89             assert filecap.startswith("URI:SSK-RO:")
90             self._uri = ReadonlySSKFileURI.init_from_string(filecap)
91             self._writekey = None
92         self._readkey = self._uri.readkey
93         self._storage_index = self._uri.storage_index
94         self._fingerprint = self._uri.fingerprint
95         # the following values are learned during Retrieval
96         #  self._pubkey
97         #  self._required_shares
98         #  self._total_shares
99         # and these are needed for Publish. They are filled in by Retrieval
100         # if possible, otherwise by the first peer that Publish talks to.
101         self._privkey = None
102         self._encprivkey = None
103         return self
104
105     def create_with_keys(self, (pubkey, privkey), contents):
106         """Call this to create a brand-new mutable file. It will create the
107         shares, find homes for them, and upload the initial contents (created
108         with the same rules as IClient.create_mutable_file() ). Returns a
109         Deferred that fires (with the MutableFileNode instance you should
110         use) when it completes.
111         """
112         self._pubkey, self._privkey = pubkey, privkey
113         pubkey_s = self._pubkey.serialize()
114         privkey_s = self._privkey.serialize()
115         self._writekey = hashutil.ssk_writekey_hash(privkey_s)
116         self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
117         self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
118         self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
119         self._readkey = self._uri.readkey
120         self._storage_index = self._uri.storage_index
121         initial_contents = self._get_initial_contents(contents)
122         return self._upload(initial_contents, None)
123
124     def _get_initial_contents(self, contents):
125         if isinstance(contents, str):
126             return contents
127         if contents is None:
128             return ""
129         assert callable(contents), "%s should be callable, not %s" % \
130                (contents, type(contents))
131         return contents(self)
132
133     def _encrypt_privkey(self, writekey, privkey):
134         enc = AES(writekey)
135         crypttext = enc.process(privkey)
136         return crypttext
137
138     def _decrypt_privkey(self, enc_privkey):
139         enc = AES(self._writekey)
140         privkey = enc.process(enc_privkey)
141         return privkey
142
143     def _populate_pubkey(self, pubkey):
144         self._pubkey = pubkey
145     def _populate_required_shares(self, required_shares):
146         self._required_shares = required_shares
147     def _populate_total_shares(self, total_shares):
148         self._total_shares = total_shares
149
150     def _populate_privkey(self, privkey):
151         self._privkey = privkey
152     def _populate_encprivkey(self, encprivkey):
153         self._encprivkey = encprivkey
154
155
156     def get_write_enabler(self, peerid):
157         assert len(peerid) == 20
158         return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
159     def get_renewal_secret(self, peerid):
160         assert len(peerid) == 20
161         crs = self._secret_holder.get_renewal_secret()
162         frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
163         return hashutil.bucket_renewal_secret_hash(frs, peerid)
164     def get_cancel_secret(self, peerid):
165         assert len(peerid) == 20
166         ccs = self._secret_holder.get_cancel_secret()
167         fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
168         return hashutil.bucket_cancel_secret_hash(fcs, peerid)
169
170     def get_writekey(self):
171         return self._writekey
172     def get_readkey(self):
173         return self._readkey
174     def get_storage_index(self):
175         return self._storage_index
176     def get_privkey(self):
177         return self._privkey
178     def get_encprivkey(self):
179         return self._encprivkey
180     def get_pubkey(self):
181         return self._pubkey
182
183     def get_required_shares(self):
184         return self._required_shares
185     def get_total_shares(self):
186         return self._total_shares
187
188     ####################################
189     # IFilesystemNode
190
191     def get_uri(self):
192         return self._uri.to_string()
193     def get_size(self):
194         return "?" # TODO: this is likely to cause problems, not being an int
195     def get_readonly(self):
196         if self.is_readonly():
197             return self
198         ro = MutableFileNode(self._storage_broker, self._secret_holder,
199                              self._default_encoding_parameters, self._history)
200         ro.init_from_uri(self.get_readonly_uri())
201         return ro
202
203     def get_readonly_uri(self):
204         return self._uri.get_readonly().to_string()
205
206     def is_mutable(self):
207         return self._uri.is_mutable()
208     def is_readonly(self):
209         return self._uri.is_readonly()
210
211     def __hash__(self):
212         return hash((self.__class__, self._uri))
213     def __cmp__(self, them):
214         if cmp(type(self), type(them)):
215             return cmp(type(self), type(them))
216         if cmp(self.__class__, them.__class__):
217             return cmp(self.__class__, them.__class__)
218         return cmp(self._uri, them._uri)
219
220     def get_verify_cap(self):
221         return IMutableFileURI(self._uri).get_verify_cap()
222
223     def get_repair_cap(self):
224         if self._uri.is_readonly():
225             return None
226         return self._uri
227
228     def _do_serialized(self, cb, *args, **kwargs):
229         # note: to avoid deadlock, this callable is *not* allowed to invoke
230         # other serialized methods within this (or any other)
231         # MutableFileNode. The callable should be a bound method of this same
232         # MFN instance.
233         d = defer.Deferred()
234         self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
235         # we need to put off d.callback until this Deferred is finished being
236         # processed. Otherwise the caller's subsequent activities (like,
237         # doing other things with this node) can cause reentrancy problems in
238         # the Deferred code itself
239         self._serializer.addBoth(lambda res: eventually(d.callback, res))
240         # add a log.err just in case something really weird happens, because
241         # self._serializer stays around forever, therefore we won't see the
242         # usual Unhandled Error in Deferred that would give us a hint.
243         self._serializer.addErrback(log.err)
244         return d
245
246     #################################
247     # ICheckable
248
249     def check(self, monitor, verify=False, add_lease=False):
250         checker = MutableChecker(self, self._storage_broker,
251                                  self._history, monitor)
252         return checker.check(verify, add_lease)
253
254     def check_and_repair(self, monitor, verify=False, add_lease=False):
255         checker = MutableCheckAndRepairer(self, self._storage_broker,
256                                           self._history, monitor)
257         return checker.check(verify, add_lease)
258
259     #################################
260     # IRepairable
261
262     def repair(self, check_results, force=False):
263         assert ICheckResults(check_results)
264         r = Repairer(self, check_results)
265         d = r.start(force)
266         return d
267
268
269     #################################
270     # IMutableFileNode
271
272     # allow the use of IDownloadTarget
273     def download(self, target):
274         # fake it. TODO: make this cleaner.
275         d = self.download_best_version()
276         def _done(data):
277             target.open(len(data))
278             target.write(data)
279             target.close()
280             return target.finish()
281         d.addCallback(_done)
282         return d
283
284
285     # new API
286
287     def download_best_version(self):
288         return self._do_serialized(self._download_best_version)
289     def _download_best_version(self):
290         servermap = ServerMap()
291         d = self._try_once_to_download_best_version(servermap, MODE_READ)
292         def _maybe_retry(f):
293             f.trap(NotEnoughSharesError)
294             # the download is worth retrying once. Make sure to use the
295             # old servermap, since it is what remembers the bad shares,
296             # but use MODE_WRITE to make it look for even more shares.
297             # TODO: consider allowing this to retry multiple times.. this
298             # approach will let us tolerate about 8 bad shares, I think.
299             return self._try_once_to_download_best_version(servermap,
300                                                            MODE_WRITE)
301         d.addErrback(_maybe_retry)
302         return d
303     def _try_once_to_download_best_version(self, servermap, mode):
304         d = self._update_servermap(servermap, mode)
305         d.addCallback(self._once_updated_download_best_version, servermap)
306         return d
307     def _once_updated_download_best_version(self, ignored, servermap):
308         goal = servermap.best_recoverable_version()
309         if not goal:
310             raise UnrecoverableFileError("no recoverable versions")
311         return self._try_once_to_download_version(servermap, goal)
312
313     def get_size_of_best_version(self):
314         d = self.get_servermap(MODE_READ)
315         def _got_servermap(smap):
316             ver = smap.best_recoverable_version()
317             if not ver:
318                 raise UnrecoverableFileError("no recoverable version")
319             return smap.size_of_version(ver)
320         d.addCallback(_got_servermap)
321         return d
322
323     def overwrite(self, new_contents):
324         return self._do_serialized(self._overwrite, new_contents)
325     def _overwrite(self, new_contents):
326         servermap = ServerMap()
327         d = self._update_servermap(servermap, mode=MODE_WRITE)
328         d.addCallback(lambda ignored: self._upload(new_contents, servermap))
329         return d
330
331
332     def modify(self, modifier, backoffer=None):
333         """I use a modifier callback to apply a change to the mutable file.
334         I implement the following pseudocode::
335
336          obtain_mutable_filenode_lock()
337          first_time = True
338          while True:
339            update_servermap(MODE_WRITE)
340            old = retrieve_best_version()
341            new = modifier(old, servermap, first_time)
342            first_time = False
343            if new == old: break
344            try:
345              publish(new)
346            except UncoordinatedWriteError, e:
347              backoffer(e)
348              continue
349            break
350          release_mutable_filenode_lock()
351
352         The idea is that your modifier function can apply a delta of some
353         sort, and it will be re-run as necessary until it succeeds. The
354         modifier must inspect the old version to see whether its delta has
355         already been applied: if so it should return the contents unmodified.
356
357         Note that the modifier is required to run synchronously, and must not
358         invoke any methods on this MutableFileNode instance.
359
360         The backoff-er is a callable that is responsible for inserting a
361         random delay between subsequent attempts, to help competing updates
362         from colliding forever. It is also allowed to give up after a while.
363         The backoffer is given two arguments: this MutableFileNode, and the
364         Failure object that contains the UncoordinatedWriteError. It should
365         return a Deferred that will fire when the next attempt should be
366         made, or return the Failure if the loop should give up. If
367         backoffer=None, a default one is provided which will perform
368         exponential backoff, and give up after 4 tries. Note that the
369         backoffer should not invoke any methods on this MutableFileNode
370         instance, and it needs to be highly conscious of deadlock issues.
371         """
372         return self._do_serialized(self._modify, modifier, backoffer)
373     def _modify(self, modifier, backoffer):
374         servermap = ServerMap()
375         if backoffer is None:
376             backoffer = BackoffAgent().delay
377         return self._modify_and_retry(servermap, modifier, backoffer, True)
378     def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
379         d = self._modify_once(servermap, modifier, first_time)
380         def _retry(f):
381             f.trap(UncoordinatedWriteError)
382             d2 = defer.maybeDeferred(backoffer, self, f)
383             d2.addCallback(lambda ignored:
384                            self._modify_and_retry(servermap, modifier,
385                                                   backoffer, False))
386             return d2
387         d.addErrback(_retry)
388         return d
389     def _modify_once(self, servermap, modifier, first_time):
390         d = self._update_servermap(servermap, MODE_WRITE)
391         d.addCallback(self._once_updated_download_best_version, servermap)
392         def _apply(old_contents):
393             new_contents = modifier(old_contents, servermap, first_time)
394             if new_contents is None or new_contents == old_contents:
395                 # no changes need to be made
396                 if first_time:
397                     return
398                 # However, since Publish is not automatically doing a
399                 # recovery when it observes UCWE, we need to do a second
400                 # publish. See #551 for details. We'll basically loop until
401                 # we managed an uncontested publish.
402                 new_contents = old_contents
403             precondition(isinstance(new_contents, str),
404                          "Modifier function must return a string or None")
405             return self._upload(new_contents, servermap)
406         d.addCallback(_apply)
407         return d
408
409     def get_servermap(self, mode):
410         return self._do_serialized(self._get_servermap, mode)
411     def _get_servermap(self, mode):
412         servermap = ServerMap()
413         return self._update_servermap(servermap, mode)
414     def _update_servermap(self, servermap, mode):
415         u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
416                              mode)
417         if self._history:
418             self._history.notify_mapupdate(u.get_status())
419         return u.update()
420
421     def download_version(self, servermap, version, fetch_privkey=False):
422         return self._do_serialized(self._try_once_to_download_version,
423                                    servermap, version, fetch_privkey)
424     def _try_once_to_download_version(self, servermap, version,
425                                       fetch_privkey=False):
426         r = Retrieve(self, servermap, version, fetch_privkey)
427         if self._history:
428             self._history.notify_retrieve(r.get_status())
429         return r.download()
430
431     def upload(self, new_contents, servermap):
432         return self._do_serialized(self._upload, new_contents, servermap)
433     def _upload(self, new_contents, servermap):
434         assert self._pubkey, "update_servermap must be called before publish"
435         p = Publish(self, self._storage_broker, servermap)
436         if self._history:
437             self._history.notify_publish(p.get_status(), len(new_contents))
438         return p.publish(new_contents)