]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/filenode.py
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / filenode.py
1
2 import random
3
4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
8      ICheckable, ICheckResults, NotEnoughSharesError
9 from allmydata.util import hashutil, log
10 from allmydata.util.assertutil import precondition
11 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI
12 from allmydata.monitor import Monitor
13 from pycryptopp.cipher.aes import AES
14
15 from publish import Publish
16 from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
17      ResponseCache, UncoordinatedWriteError
18 from servermap import ServerMap, ServermapUpdater
19 from retrieve import Retrieve
20 from checker import MutableChecker, MutableCheckAndRepairer
21 from repairer import Repairer
22
23
24 class BackoffAgent:
25     # these parameters are copied from foolscap.reconnector, which gets them
26     # from twisted.internet.protocol.ReconnectingClientFactory
27     initialDelay = 1.0
28     factor = 2.7182818284590451 # (math.e)
29     jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
30     maxRetries = 4
31
32     def __init__(self):
33         self._delay = self.initialDelay
34         self._count = 0
35     def delay(self, node, f):
36         self._count += 1
37         if self._count == 4:
38             return f
39         self._delay = self._delay * self.factor
40         self._delay = random.normalvariate(self._delay,
41                                            self._delay * self.jitter)
42         d = defer.Deferred()
43         reactor.callLater(self._delay, d.callback, None)
44         return d
45
46 # use nodemaker.create_mutable_file() to make one of these
47
48 class MutableFileNode:
49     implements(IMutableFileNode, ICheckable)
50
51     def __init__(self, storage_broker, secret_holder,
52                  default_encoding_parameters, history):
53         self._storage_broker = storage_broker
54         self._secret_holder = secret_holder
55         self._default_encoding_parameters = default_encoding_parameters
56         self._history = history
57         self._pubkey = None # filled in upon first read
58         self._privkey = None # filled in if we're mutable
59         # we keep track of the last encoding parameters that we use. These
60         # are updated upon retrieve, and used by publish. If we publish
61         # without ever reading (i.e. overwrite()), then we use these values.
62         self._required_shares = default_encoding_parameters["k"]
63         self._total_shares = default_encoding_parameters["n"]
64         self._sharemap = {} # known shares, shnum-to-[nodeids]
65         self._cache = ResponseCache()
66
67         # all users of this MutableFileNode go through the serializer. This
68         # takes advantage of the fact that Deferreds discard the callbacks
69         # that they're done with, so we can keep using the same Deferred
70         # forever without consuming more and more memory.
71         self._serializer = defer.succeed(None)
72
73     def __repr__(self):
74         if hasattr(self, '_uri'):
75             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
76         else:
77             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
78
79     def init_from_uri(self, filecap):
80         # we have the URI, but we have not yet retrieved the public
81         # verification key, nor things like 'k' or 'N'. If and when someone
82         # wants to get our contents, we'll pull from shares and fill those
83         # in.
84         assert isinstance(filecap, str)
85         if filecap.startswith("URI:SSK:"):
86             self._uri = WriteableSSKFileURI.init_from_string(filecap)
87             self._writekey = self._uri.writekey
88         else:
89             assert filecap.startswith("URI:SSK-RO:")
90             self._uri = ReadonlySSKFileURI.init_from_string(filecap)
91             self._writekey = None
92         self._readkey = self._uri.readkey
93         self._storage_index = self._uri.storage_index
94         self._fingerprint = self._uri.fingerprint
95         # the following values are learned during Retrieval
96         #  self._pubkey
97         #  self._required_shares
98         #  self._total_shares
99         # and these are needed for Publish. They are filled in by Retrieval
100         # if possible, otherwise by the first peer that Publish talks to.
101         self._privkey = None
102         self._encprivkey = None
103         return self
104
105     def create_with_keys(self, (pubkey, privkey), initial_contents):
106         """Call this to create a brand-new mutable file. It will create the
107         shares, find homes for them, and upload the initial contents. Returns
108         a Deferred that fires (with the MutableFileNode instance you should
109         use) when it completes.
110         """
111         self._pubkey, self._privkey = pubkey, privkey
112         pubkey_s = self._pubkey.serialize()
113         privkey_s = self._privkey.serialize()
114         self._writekey = hashutil.ssk_writekey_hash(privkey_s)
115         self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
116         self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
117         self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
118         self._readkey = self._uri.readkey
119         self._storage_index = self._uri.storage_index
120         return self._upload(initial_contents, None)
121
122     def _encrypt_privkey(self, writekey, privkey):
123         enc = AES(writekey)
124         crypttext = enc.process(privkey)
125         return crypttext
126
127     def _decrypt_privkey(self, enc_privkey):
128         enc = AES(self._writekey)
129         privkey = enc.process(enc_privkey)
130         return privkey
131
132     def _populate_pubkey(self, pubkey):
133         self._pubkey = pubkey
134     def _populate_required_shares(self, required_shares):
135         self._required_shares = required_shares
136     def _populate_total_shares(self, total_shares):
137         self._total_shares = total_shares
138
139     def _populate_privkey(self, privkey):
140         self._privkey = privkey
141     def _populate_encprivkey(self, encprivkey):
142         self._encprivkey = encprivkey
143
144
145     def get_write_enabler(self, peerid):
146         assert len(peerid) == 20
147         return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
148     def get_renewal_secret(self, peerid):
149         assert len(peerid) == 20
150         crs = self._secret_holder.get_renewal_secret()
151         frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
152         return hashutil.bucket_renewal_secret_hash(frs, peerid)
153     def get_cancel_secret(self, peerid):
154         assert len(peerid) == 20
155         ccs = self._secret_holder.get_cancel_secret()
156         fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
157         return hashutil.bucket_cancel_secret_hash(fcs, peerid)
158
159     def get_writekey(self):
160         return self._writekey
161     def get_readkey(self):
162         return self._readkey
163     def get_storage_index(self):
164         return self._storage_index
165     def get_privkey(self):
166         return self._privkey
167     def get_encprivkey(self):
168         return self._encprivkey
169     def get_pubkey(self):
170         return self._pubkey
171
172     def get_required_shares(self):
173         return self._required_shares
174     def get_total_shares(self):
175         return self._total_shares
176
177     ####################################
178     # IFilesystemNode
179
180     def get_uri(self):
181         return self._uri.to_string()
182     def get_size(self):
183         return "?" # TODO: this is likely to cause problems, not being an int
184     def get_readonly(self):
185         if self.is_readonly():
186             return self
187         ro = MutableFileNode(self._storage_broker, self._secret_holder,
188                              self._default_encoding_parameters, self._history)
189         ro.init_from_uri(self.get_readonly_uri())
190         return ro
191
192     def get_readonly_uri(self):
193         return self._uri.get_readonly().to_string()
194
195     def is_mutable(self):
196         return self._uri.is_mutable()
197     def is_readonly(self):
198         return self._uri.is_readonly()
199
200     def __hash__(self):
201         return hash((self.__class__, self._uri))
202     def __cmp__(self, them):
203         if cmp(type(self), type(them)):
204             return cmp(type(self), type(them))
205         if cmp(self.__class__, them.__class__):
206             return cmp(self.__class__, them.__class__)
207         return cmp(self._uri, them._uri)
208
209     def get_verify_cap(self):
210         return IMutableFileURI(self._uri).get_verify_cap()
211
212     def get_repair_cap(self):
213         if self._uri.is_readonly():
214             return None
215         return self._uri
216
217     def _do_serialized(self, cb, *args, **kwargs):
218         # note: to avoid deadlock, this callable is *not* allowed to invoke
219         # other serialized methods within this (or any other)
220         # MutableFileNode. The callable should be a bound method of this same
221         # MFN instance.
222         d = defer.Deferred()
223         self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
224         # we need to put off d.callback until this Deferred is finished being
225         # processed. Otherwise the caller's subsequent activities (like,
226         # doing other things with this node) can cause reentrancy problems in
227         # the Deferred code itself
228         self._serializer.addBoth(lambda res: eventually(d.callback, res))
229         # add a log.err just in case something really weird happens, because
230         # self._serializer stays around forever, therefore we won't see the
231         # usual Unhandled Error in Deferred that would give us a hint.
232         self._serializer.addErrback(log.err)
233         return d
234
235     #################################
236     # ICheckable
237
238     def check(self, monitor, verify=False, add_lease=False):
239         checker = MutableChecker(self, self._storage_broker,
240                                  self._history, monitor)
241         return checker.check(verify, add_lease)
242
243     def check_and_repair(self, monitor, verify=False, add_lease=False):
244         checker = MutableCheckAndRepairer(self, self._storage_broker,
245                                           self._history, monitor)
246         return checker.check(verify, add_lease)
247
248     #################################
249     # IRepairable
250
251     def repair(self, check_results, force=False):
252         assert ICheckResults(check_results)
253         r = Repairer(self, check_results)
254         d = r.start(force)
255         return d
256
257
258     #################################
259     # IMutableFileNode
260
261     # allow the use of IDownloadTarget
262     def download(self, target):
263         # fake it. TODO: make this cleaner.
264         d = self.download_best_version()
265         def _done(data):
266             target.open(len(data))
267             target.write(data)
268             target.close()
269             return target.finish()
270         d.addCallback(_done)
271         return d
272
273
274     # new API
275
276     def download_best_version(self):
277         return self._do_serialized(self._download_best_version)
278     def _download_best_version(self):
279         servermap = ServerMap()
280         d = self._try_once_to_download_best_version(servermap, MODE_READ)
281         def _maybe_retry(f):
282             f.trap(NotEnoughSharesError)
283             # the download is worth retrying once. Make sure to use the
284             # old servermap, since it is what remembers the bad shares,
285             # but use MODE_WRITE to make it look for even more shares.
286             # TODO: consider allowing this to retry multiple times.. this
287             # approach will let us tolerate about 8 bad shares, I think.
288             return self._try_once_to_download_best_version(servermap,
289                                                            MODE_WRITE)
290         d.addErrback(_maybe_retry)
291         return d
292     def _try_once_to_download_best_version(self, servermap, mode):
293         d = self._update_servermap(servermap, mode)
294         d.addCallback(self._once_updated_download_best_version, servermap)
295         return d
296     def _once_updated_download_best_version(self, ignored, servermap):
297         goal = servermap.best_recoverable_version()
298         if not goal:
299             raise UnrecoverableFileError("no recoverable versions")
300         return self._try_once_to_download_version(servermap, goal)
301
302     def get_size_of_best_version(self):
303         d = self.get_servermap(MODE_READ)
304         def _got_servermap(smap):
305             ver = smap.best_recoverable_version()
306             if not ver:
307                 raise UnrecoverableFileError("no recoverable version")
308             return smap.size_of_version(ver)
309         d.addCallback(_got_servermap)
310         return d
311
312     def overwrite(self, new_contents):
313         return self._do_serialized(self._overwrite, new_contents)
314     def _overwrite(self, new_contents):
315         servermap = ServerMap()
316         d = self._update_servermap(servermap, mode=MODE_WRITE)
317         d.addCallback(lambda ignored: self._upload(new_contents, servermap))
318         return d
319
320
321     def modify(self, modifier, backoffer=None):
322         """I use a modifier callback to apply a change to the mutable file.
323         I implement the following pseudocode::
324
325          obtain_mutable_filenode_lock()
326          first_time = True
327          while True:
328            update_servermap(MODE_WRITE)
329            old = retrieve_best_version()
330            new = modifier(old, servermap, first_time)
331            first_time = False
332            if new == old: break
333            try:
334              publish(new)
335            except UncoordinatedWriteError, e:
336              backoffer(e)
337              continue
338            break
339          release_mutable_filenode_lock()
340
341         The idea is that your modifier function can apply a delta of some
342         sort, and it will be re-run as necessary until it succeeds. The
343         modifier must inspect the old version to see whether its delta has
344         already been applied: if so it should return the contents unmodified.
345
346         Note that the modifier is required to run synchronously, and must not
347         invoke any methods on this MutableFileNode instance.
348
349         The backoff-er is a callable that is responsible for inserting a
350         random delay between subsequent attempts, to help competing updates
351         from colliding forever. It is also allowed to give up after a while.
352         The backoffer is given two arguments: this MutableFileNode, and the
353         Failure object that contains the UncoordinatedWriteError. It should
354         return a Deferred that will fire when the next attempt should be
355         made, or return the Failure if the loop should give up. If
356         backoffer=None, a default one is provided which will perform
357         exponential backoff, and give up after 4 tries. Note that the
358         backoffer should not invoke any methods on this MutableFileNode
359         instance, and it needs to be highly conscious of deadlock issues.
360         """
361         return self._do_serialized(self._modify, modifier, backoffer)
362     def _modify(self, modifier, backoffer):
363         servermap = ServerMap()
364         if backoffer is None:
365             backoffer = BackoffAgent().delay
366         return self._modify_and_retry(servermap, modifier, backoffer, True)
367     def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
368         d = self._modify_once(servermap, modifier, first_time)
369         def _retry(f):
370             f.trap(UncoordinatedWriteError)
371             d2 = defer.maybeDeferred(backoffer, self, f)
372             d2.addCallback(lambda ignored:
373                            self._modify_and_retry(servermap, modifier,
374                                                   backoffer, False))
375             return d2
376         d.addErrback(_retry)
377         return d
378     def _modify_once(self, servermap, modifier, first_time):
379         d = self._update_servermap(servermap, MODE_WRITE)
380         d.addCallback(self._once_updated_download_best_version, servermap)
381         def _apply(old_contents):
382             new_contents = modifier(old_contents, servermap, first_time)
383             if new_contents is None or new_contents == old_contents:
384                 # no changes need to be made
385                 if first_time:
386                     return
387                 # However, since Publish is not automatically doing a
388                 # recovery when it observes UCWE, we need to do a second
389                 # publish. See #551 for details. We'll basically loop until
390                 # we managed an uncontested publish.
391                 new_contents = old_contents
392             precondition(isinstance(new_contents, str),
393                          "Modifier function must return a string or None")
394             return self._upload(new_contents, servermap)
395         d.addCallback(_apply)
396         return d
397
398     def get_servermap(self, mode):
399         return self._do_serialized(self._get_servermap, mode)
400     def _get_servermap(self, mode):
401         servermap = ServerMap()
402         return self._update_servermap(servermap, mode)
403     def _update_servermap(self, servermap, mode):
404         u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
405                              mode)
406         if self._history:
407             self._history.notify_mapupdate(u.get_status())
408         return u.update()
409
410     def download_version(self, servermap, version, fetch_privkey=False):
411         return self._do_serialized(self._try_once_to_download_version,
412                                    servermap, version, fetch_privkey)
413     def _try_once_to_download_version(self, servermap, version,
414                                       fetch_privkey=False):
415         r = Retrieve(self, servermap, version, fetch_privkey)
416         if self._history:
417             self._history.notify_retrieve(r.get_status())
418         return r.download()
419
420     def upload(self, new_contents, servermap):
421         return self._do_serialized(self._upload, new_contents, servermap)
422     def _upload(self, new_contents, servermap):
423         assert self._pubkey, "update_servermap must be called before publish"
424         p = Publish(self, self._storage_broker, servermap)
425         if self._history:
426             self._history.notify_publish(p.get_status(), len(new_contents))
427         return p.publish(new_contents)