]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/filenode.py
be5ca5c211c1a89cb70402ae918e8eb1de6f5d55
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / filenode.py
1
2 import random
3
4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, \
8      ICheckable, ICheckResults, NotEnoughSharesError
9 from allmydata.util import hashutil, log
10 from allmydata.util.assertutil import precondition
11 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI
12 from allmydata.monitor import Monitor
13 from pycryptopp.cipher.aes import AES
14
15 from publish import Publish
16 from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
17      ResponseCache, UncoordinatedWriteError
18 from servermap import ServerMap, ServermapUpdater
19 from retrieve import Retrieve
20 from checker import MutableChecker, MutableCheckAndRepairer
21 from repairer import Repairer
22
23
24 class BackoffAgent:
25     # these parameters are copied from foolscap.reconnector, which gets them
26     # from twisted.internet.protocol.ReconnectingClientFactory
27     initialDelay = 1.0
28     factor = 2.7182818284590451 # (math.e)
29     jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
30     maxRetries = 4
31
32     def __init__(self):
33         self._delay = self.initialDelay
34         self._count = 0
35     def delay(self, node, f):
36         self._count += 1
37         if self._count == 4:
38             return f
39         self._delay = self._delay * self.factor
40         self._delay = random.normalvariate(self._delay,
41                                            self._delay * self.jitter)
42         d = defer.Deferred()
43         reactor.callLater(self._delay, d.callback, None)
44         return d
45
46 # use nodemaker.create_mutable_file() to make one of these
47
48 class MutableFileNode:
49     implements(IMutableFileNode, ICheckable)
50
51     def __init__(self, storage_broker, secret_holder,
52                  default_encoding_parameters, history):
53         self._storage_broker = storage_broker
54         self._secret_holder = secret_holder
55         self._default_encoding_parameters = default_encoding_parameters
56         self._history = history
57         self._pubkey = None # filled in upon first read
58         self._privkey = None # filled in if we're mutable
59         # we keep track of the last encoding parameters that we use. These
60         # are updated upon retrieve, and used by publish. If we publish
61         # without ever reading (i.e. overwrite()), then we use these values.
62         self._required_shares = default_encoding_parameters["k"]
63         self._total_shares = default_encoding_parameters["n"]
64         self._sharemap = {} # known shares, shnum-to-[nodeids]
65         self._cache = ResponseCache()
66         self._most_recent_size = None
67
68         # all users of this MutableFileNode go through the serializer. This
69         # takes advantage of the fact that Deferreds discard the callbacks
70         # that they're done with, so we can keep using the same Deferred
71         # forever without consuming more and more memory.
72         self._serializer = defer.succeed(None)
73
74     def __repr__(self):
75         if hasattr(self, '_uri'):
76             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
77         else:
78             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
79
80     def init_from_cap(self, filecap):
81         # we have the URI, but we have not yet retrieved the public
82         # verification key, nor things like 'k' or 'N'. If and when someone
83         # wants to get our contents, we'll pull from shares and fill those
84         # in.
85         assert isinstance(filecap, (ReadonlySSKFileURI, WriteableSSKFileURI))
86         self._uri = filecap
87         self._writekey = None
88         if isinstance(filecap, WriteableSSKFileURI):
89             self._writekey = self._uri.writekey
90         self._readkey = self._uri.readkey
91         self._storage_index = self._uri.storage_index
92         self._fingerprint = self._uri.fingerprint
93         # the following values are learned during Retrieval
94         #  self._pubkey
95         #  self._required_shares
96         #  self._total_shares
97         # and these are needed for Publish. They are filled in by Retrieval
98         # if possible, otherwise by the first peer that Publish talks to.
99         self._privkey = None
100         self._encprivkey = None
101         return self
102
103     def create_with_keys(self, (pubkey, privkey), contents):
104         """Call this to create a brand-new mutable file. It will create the
105         shares, find homes for them, and upload the initial contents (created
106         with the same rules as IClient.create_mutable_file() ). Returns a
107         Deferred that fires (with the MutableFileNode instance you should
108         use) when it completes.
109         """
110         self._pubkey, self._privkey = pubkey, privkey
111         pubkey_s = self._pubkey.serialize()
112         privkey_s = self._privkey.serialize()
113         self._writekey = hashutil.ssk_writekey_hash(privkey_s)
114         self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
115         self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
116         self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
117         self._readkey = self._uri.readkey
118         self._storage_index = self._uri.storage_index
119         initial_contents = self._get_initial_contents(contents)
120         return self._upload(initial_contents, None)
121
122     def _get_initial_contents(self, contents):
123         if isinstance(contents, str):
124             return contents
125         if contents is None:
126             return ""
127         assert callable(contents), "%s should be callable, not %s" % \
128                (contents, type(contents))
129         return contents(self)
130
131     def _encrypt_privkey(self, writekey, privkey):
132         enc = AES(writekey)
133         crypttext = enc.process(privkey)
134         return crypttext
135
136     def _decrypt_privkey(self, enc_privkey):
137         enc = AES(self._writekey)
138         privkey = enc.process(enc_privkey)
139         return privkey
140
141     def _populate_pubkey(self, pubkey):
142         self._pubkey = pubkey
143     def _populate_required_shares(self, required_shares):
144         self._required_shares = required_shares
145     def _populate_total_shares(self, total_shares):
146         self._total_shares = total_shares
147
148     def _populate_privkey(self, privkey):
149         self._privkey = privkey
150     def _populate_encprivkey(self, encprivkey):
151         self._encprivkey = encprivkey
152     def _add_to_cache(self, verinfo, shnum, offset, data, timestamp):
153         self._cache.add(verinfo, shnum, offset, data, timestamp)
154     def _read_from_cache(self, verinfo, shnum, offset, length):
155         return self._cache.read(verinfo, shnum, offset, length)
156
157     def get_write_enabler(self, peerid):
158         assert len(peerid) == 20
159         return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
160     def get_renewal_secret(self, peerid):
161         assert len(peerid) == 20
162         crs = self._secret_holder.get_renewal_secret()
163         frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
164         return hashutil.bucket_renewal_secret_hash(frs, peerid)
165     def get_cancel_secret(self, peerid):
166         assert len(peerid) == 20
167         ccs = self._secret_holder.get_cancel_secret()
168         fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
169         return hashutil.bucket_cancel_secret_hash(fcs, peerid)
170
171     def get_writekey(self):
172         return self._writekey
173     def get_readkey(self):
174         return self._readkey
175     def get_storage_index(self):
176         return self._storage_index
177     def get_fingerprint(self):
178         return self._fingerprint
179     def get_privkey(self):
180         return self._privkey
181     def get_encprivkey(self):
182         return self._encprivkey
183     def get_pubkey(self):
184         return self._pubkey
185
186     def get_required_shares(self):
187         return self._required_shares
188     def get_total_shares(self):
189         return self._total_shares
190
191     ####################################
192     # IFilesystemNode
193
194     def get_size(self):
195         return self._most_recent_size
196     def get_current_size(self):
197         d = self.get_size_of_best_version()
198         d.addCallback(self._stash_size)
199         return d
200     def _stash_size(self, size):
201         self._most_recent_size = size
202         return size
203
204     def get_cap(self):
205         return self._uri
206     def get_readcap(self):
207         return self._uri.get_readonly()
208     def get_verify_cap(self):
209         return self._uri.get_verify_cap()
210     def get_repair_cap(self):
211         if self._uri.is_readonly():
212             return None
213         return self._uri
214
215     def get_uri(self):
216         return self._uri.to_string()
217
218     def get_write_uri(self):
219         if self.is_readonly():
220             return None
221         return self._uri.to_string()
222
223     def get_readonly_uri(self):
224         return self._uri.get_readonly().to_string()
225
226     def get_readonly(self):
227         if self.is_readonly():
228             return self
229         ro = MutableFileNode(self._storage_broker, self._secret_holder,
230                              self._default_encoding_parameters, self._history)
231         ro.init_from_cap(self._uri.get_readonly())
232         return ro
233
234     def is_mutable(self):
235         return self._uri.is_mutable()
236
237     def is_readonly(self):
238         return self._uri.is_readonly()
239
240     def is_unknown(self):
241         return False
242
243     def is_allowed_in_immutable_directory(self):
244         return not self._uri.is_mutable()
245
246     def raise_error(self):
247         pass
248
249     def __hash__(self):
250         return hash((self.__class__, self._uri))
251     def __cmp__(self, them):
252         if cmp(type(self), type(them)):
253             return cmp(type(self), type(them))
254         if cmp(self.__class__, them.__class__):
255             return cmp(self.__class__, them.__class__)
256         return cmp(self._uri, them._uri)
257
258     def _do_serialized(self, cb, *args, **kwargs):
259         # note: to avoid deadlock, this callable is *not* allowed to invoke
260         # other serialized methods within this (or any other)
261         # MutableFileNode. The callable should be a bound method of this same
262         # MFN instance.
263         d = defer.Deferred()
264         self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
265         # we need to put off d.callback until this Deferred is finished being
266         # processed. Otherwise the caller's subsequent activities (like,
267         # doing other things with this node) can cause reentrancy problems in
268         # the Deferred code itself
269         self._serializer.addBoth(lambda res: eventually(d.callback, res))
270         # add a log.err just in case something really weird happens, because
271         # self._serializer stays around forever, therefore we won't see the
272         # usual Unhandled Error in Deferred that would give us a hint.
273         self._serializer.addErrback(log.err)
274         return d
275
276     #################################
277     # ICheckable
278
279     def check(self, monitor, verify=False, add_lease=False):
280         checker = MutableChecker(self, self._storage_broker,
281                                  self._history, monitor)
282         return checker.check(verify, add_lease)
283
284     def check_and_repair(self, monitor, verify=False, add_lease=False):
285         checker = MutableCheckAndRepairer(self, self._storage_broker,
286                                           self._history, monitor)
287         return checker.check(verify, add_lease)
288
289     #################################
290     # IRepairable
291
292     def repair(self, check_results, force=False):
293         assert ICheckResults(check_results)
294         r = Repairer(self, check_results)
295         d = r.start(force)
296         return d
297
298
299     #################################
300     # IMutableFileNode
301
302     def download_best_version(self):
303         return self._do_serialized(self._download_best_version)
304     def _download_best_version(self):
305         servermap = ServerMap()
306         d = self._try_once_to_download_best_version(servermap, MODE_READ)
307         def _maybe_retry(f):
308             f.trap(NotEnoughSharesError)
309             # the download is worth retrying once. Make sure to use the
310             # old servermap, since it is what remembers the bad shares,
311             # but use MODE_WRITE to make it look for even more shares.
312             # TODO: consider allowing this to retry multiple times.. this
313             # approach will let us tolerate about 8 bad shares, I think.
314             return self._try_once_to_download_best_version(servermap,
315                                                            MODE_WRITE)
316         d.addErrback(_maybe_retry)
317         return d
318     def _try_once_to_download_best_version(self, servermap, mode):
319         d = self._update_servermap(servermap, mode)
320         d.addCallback(self._once_updated_download_best_version, servermap)
321         return d
322     def _once_updated_download_best_version(self, ignored, servermap):
323         goal = servermap.best_recoverable_version()
324         if not goal:
325             raise UnrecoverableFileError("no recoverable versions")
326         return self._try_once_to_download_version(servermap, goal)
327
328     def get_size_of_best_version(self):
329         d = self.get_servermap(MODE_READ)
330         def _got_servermap(smap):
331             ver = smap.best_recoverable_version()
332             if not ver:
333                 raise UnrecoverableFileError("no recoverable version")
334             return smap.size_of_version(ver)
335         d.addCallback(_got_servermap)
336         return d
337
338     def overwrite(self, new_contents):
339         return self._do_serialized(self._overwrite, new_contents)
340     def _overwrite(self, new_contents):
341         servermap = ServerMap()
342         d = self._update_servermap(servermap, mode=MODE_WRITE)
343         d.addCallback(lambda ignored: self._upload(new_contents, servermap))
344         return d
345
346
347     def modify(self, modifier, backoffer=None):
348         """I use a modifier callback to apply a change to the mutable file.
349         I implement the following pseudocode::
350
351          obtain_mutable_filenode_lock()
352          first_time = True
353          while True:
354            update_servermap(MODE_WRITE)
355            old = retrieve_best_version()
356            new = modifier(old, servermap, first_time)
357            first_time = False
358            if new == old: break
359            try:
360              publish(new)
361            except UncoordinatedWriteError, e:
362              backoffer(e)
363              continue
364            break
365          release_mutable_filenode_lock()
366
367         The idea is that your modifier function can apply a delta of some
368         sort, and it will be re-run as necessary until it succeeds. The
369         modifier must inspect the old version to see whether its delta has
370         already been applied: if so it should return the contents unmodified.
371
372         Note that the modifier is required to run synchronously, and must not
373         invoke any methods on this MutableFileNode instance.
374
375         The backoff-er is a callable that is responsible for inserting a
376         random delay between subsequent attempts, to help competing updates
377         from colliding forever. It is also allowed to give up after a while.
378         The backoffer is given two arguments: this MutableFileNode, and the
379         Failure object that contains the UncoordinatedWriteError. It should
380         return a Deferred that will fire when the next attempt should be
381         made, or return the Failure if the loop should give up. If
382         backoffer=None, a default one is provided which will perform
383         exponential backoff, and give up after 4 tries. Note that the
384         backoffer should not invoke any methods on this MutableFileNode
385         instance, and it needs to be highly conscious of deadlock issues.
386         """
387         return self._do_serialized(self._modify, modifier, backoffer)
388     def _modify(self, modifier, backoffer):
389         servermap = ServerMap()
390         if backoffer is None:
391             backoffer = BackoffAgent().delay
392         return self._modify_and_retry(servermap, modifier, backoffer, True)
393     def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
394         d = self._modify_once(servermap, modifier, first_time)
395         def _retry(f):
396             f.trap(UncoordinatedWriteError)
397             d2 = defer.maybeDeferred(backoffer, self, f)
398             d2.addCallback(lambda ignored:
399                            self._modify_and_retry(servermap, modifier,
400                                                   backoffer, False))
401             return d2
402         d.addErrback(_retry)
403         return d
404     def _modify_once(self, servermap, modifier, first_time):
405         d = self._update_servermap(servermap, MODE_WRITE)
406         d.addCallback(self._once_updated_download_best_version, servermap)
407         def _apply(old_contents):
408             new_contents = modifier(old_contents, servermap, first_time)
409             if new_contents is None or new_contents == old_contents:
410                 # no changes need to be made
411                 if first_time:
412                     return
413                 # However, since Publish is not automatically doing a
414                 # recovery when it observes UCWE, we need to do a second
415                 # publish. See #551 for details. We'll basically loop until
416                 # we managed an uncontested publish.
417                 new_contents = old_contents
418             precondition(isinstance(new_contents, str),
419                          "Modifier function must return a string or None")
420             return self._upload(new_contents, servermap)
421         d.addCallback(_apply)
422         return d
423
424     def get_servermap(self, mode):
425         return self._do_serialized(self._get_servermap, mode)
426     def _get_servermap(self, mode):
427         servermap = ServerMap()
428         return self._update_servermap(servermap, mode)
429     def _update_servermap(self, servermap, mode):
430         u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
431                              mode)
432         if self._history:
433             self._history.notify_mapupdate(u.get_status())
434         return u.update()
435
436     def download_version(self, servermap, version, fetch_privkey=False):
437         return self._do_serialized(self._try_once_to_download_version,
438                                    servermap, version, fetch_privkey)
439     def _try_once_to_download_version(self, servermap, version,
440                                       fetch_privkey=False):
441         r = Retrieve(self, servermap, version, fetch_privkey)
442         if self._history:
443             self._history.notify_retrieve(r.get_status())
444         d = r.download()
445         d.addCallback(self._downloaded_version)
446         return d
447     def _downloaded_version(self, data):
448         self._most_recent_size = len(data)
449         return data
450
451     def upload(self, new_contents, servermap):
452         return self._do_serialized(self._upload, new_contents, servermap)
453     def _upload(self, new_contents, servermap):
454         assert self._pubkey, "update_servermap must be called before publish"
455         p = Publish(self, self._storage_broker, servermap)
456         if self._history:
457             self._history.notify_publish(p.get_status(), len(new_contents))
458         d = p.publish(new_contents)
459         d.addCallback(self._did_upload, len(new_contents))
460         return d
461     def _did_upload(self, res, size):
462         self._most_recent_size = size
463         return res