]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/filenode.py
switch all foolscap imports to use foolscap.api or foolscap.logging
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / filenode.py
1
2 import random
3
4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
8      ICheckable, ICheckResults, NotEnoughSharesError
9 from allmydata.util import hashutil, log
10 from allmydata.util.assertutil import precondition
11 from allmydata.uri import WriteableSSKFileURI
12 from allmydata.monitor import Monitor
13 from pycryptopp.publickey import rsa
14 from pycryptopp.cipher.aes import AES
15
16 from publish import Publish
17 from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
18      ResponseCache, UncoordinatedWriteError
19 from servermap import ServerMap, ServermapUpdater
20 from retrieve import Retrieve
21 from checker import MutableChecker, MutableCheckAndRepairer
22 from repairer import Repairer
23
24
25 class BackoffAgent:
26     # these parameters are copied from foolscap.reconnector, which gets them
27     # from twisted.internet.protocol.ReconnectingClientFactory
28     initialDelay = 1.0
29     factor = 2.7182818284590451 # (math.e)
30     jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
31     maxRetries = 4
32
33     def __init__(self):
34         self._delay = self.initialDelay
35         self._count = 0
36     def delay(self, node, f):
37         self._count += 1
38         if self._count == 4:
39             return f
40         self._delay = self._delay * self.factor
41         self._delay = random.normalvariate(self._delay,
42                                            self._delay * self.jitter)
43         d = defer.Deferred()
44         reactor.callLater(self._delay, d.callback, None)
45         return d
46
47 # use client.create_mutable_file() to make one of these
48
49 class MutableFileNode:
50     implements(IMutableFileNode, ICheckable)
51     SIGNATURE_KEY_SIZE = 2048
52     checker_class = MutableChecker
53     check_and_repairer_class = MutableCheckAndRepairer
54
55     def __init__(self, client):
56         self._client = client
57         self._pubkey = None # filled in upon first read
58         self._privkey = None # filled in if we're mutable
59         # we keep track of the last encoding parameters that we use. These
60         # are updated upon retrieve, and used by publish. If we publish
61         # without ever reading (i.e. overwrite()), then we use these values.
62         defaults = client.get_encoding_parameters()
63         self._required_shares = defaults["k"]
64         self._total_shares = defaults["n"]
65         self._sharemap = {} # known shares, shnum-to-[nodeids]
66         self._cache = ResponseCache()
67
68         # all users of this MutableFileNode go through the serializer. This
69         # takes advantage of the fact that Deferreds discard the callbacks
70         # that they're done with, so we can keep using the same Deferred
71         # forever without consuming more and more memory.
72         self._serializer = defer.succeed(None)
73
74     def __repr__(self):
75         if hasattr(self, '_uri'):
76             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
77         else:
78             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
79
80     def init_from_uri(self, myuri):
81         # we have the URI, but we have not yet retrieved the public
82         # verification key, nor things like 'k' or 'N'. If and when someone
83         # wants to get our contents, we'll pull from shares and fill those
84         # in.
85         self._uri = IMutableFileURI(myuri)
86         if not self._uri.is_readonly():
87             self._writekey = self._uri.writekey
88         self._readkey = self._uri.readkey
89         self._storage_index = self._uri.storage_index
90         self._fingerprint = self._uri.fingerprint
91         # the following values are learned during Retrieval
92         #  self._pubkey
93         #  self._required_shares
94         #  self._total_shares
95         # and these are needed for Publish. They are filled in by Retrieval
96         # if possible, otherwise by the first peer that Publish talks to.
97         self._privkey = None
98         self._encprivkey = None
99         return self
100
101     def create(self, initial_contents, keypair_generator=None):
102         """Call this when the filenode is first created. This will generate
103         the keys, generate the initial shares, wait until at least numpeers
104         are connected, allocate shares, and upload the initial
105         contents. Returns a Deferred that fires (with the MutableFileNode
106         instance you should use) when it completes.
107         """
108
109         d = defer.maybeDeferred(self._generate_pubprivkeys, keypair_generator)
110         d.addCallback(self._generated)
111         d.addCallback(lambda res: self._upload(initial_contents, None))
112         return d
113
114     def _generated(self, (pubkey, privkey) ):
115         self._pubkey, self._privkey = pubkey, privkey
116         pubkey_s = self._pubkey.serialize()
117         privkey_s = self._privkey.serialize()
118         self._writekey = hashutil.ssk_writekey_hash(privkey_s)
119         self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
120         self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
121         self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
122         self._readkey = self._uri.readkey
123         self._storage_index = self._uri.storage_index
124
125     def _generate_pubprivkeys(self, keypair_generator):
126         if keypair_generator:
127             return keypair_generator(self.SIGNATURE_KEY_SIZE)
128         else:
129             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2 secs
130             signer = rsa.generate(self.SIGNATURE_KEY_SIZE)
131             verifier = signer.get_verifying_key()
132             return verifier, signer
133
134     def _encrypt_privkey(self, writekey, privkey):
135         enc = AES(writekey)
136         crypttext = enc.process(privkey)
137         return crypttext
138
139     def _decrypt_privkey(self, enc_privkey):
140         enc = AES(self._writekey)
141         privkey = enc.process(enc_privkey)
142         return privkey
143
144     def _populate_pubkey(self, pubkey):
145         self._pubkey = pubkey
146     def _populate_required_shares(self, required_shares):
147         self._required_shares = required_shares
148     def _populate_total_shares(self, total_shares):
149         self._total_shares = total_shares
150
151     def _populate_privkey(self, privkey):
152         self._privkey = privkey
153     def _populate_encprivkey(self, encprivkey):
154         self._encprivkey = encprivkey
155
156
157     def get_write_enabler(self, peerid):
158         assert len(peerid) == 20
159         return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
160     def get_renewal_secret(self, peerid):
161         assert len(peerid) == 20
162         crs = self._client.get_renewal_secret()
163         frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
164         return hashutil.bucket_renewal_secret_hash(frs, peerid)
165     def get_cancel_secret(self, peerid):
166         assert len(peerid) == 20
167         ccs = self._client.get_cancel_secret()
168         fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
169         return hashutil.bucket_cancel_secret_hash(fcs, peerid)
170
171     def get_writekey(self):
172         return self._writekey
173     def get_readkey(self):
174         return self._readkey
175     def get_storage_index(self):
176         return self._storage_index
177     def get_privkey(self):
178         return self._privkey
179     def get_encprivkey(self):
180         return self._encprivkey
181     def get_pubkey(self):
182         return self._pubkey
183
184     def get_required_shares(self):
185         return self._required_shares
186     def get_total_shares(self):
187         return self._total_shares
188
189     ####################################
190     # IFilesystemNode
191
192     def get_uri(self):
193         return self._uri.to_string()
194     def get_size(self):
195         return "?" # TODO: this is likely to cause problems, not being an int
196     def get_readonly(self):
197         if self.is_readonly():
198             return self
199         ro = MutableFileNode(self._client)
200         ro.init_from_uri(self._uri.get_readonly())
201         return ro
202
203     def get_readonly_uri(self):
204         return self._uri.get_readonly().to_string()
205
206     def is_mutable(self):
207         return self._uri.is_mutable()
208     def is_readonly(self):
209         return self._uri.is_readonly()
210
211     def __hash__(self):
212         return hash((self.__class__, self._uri))
213     def __cmp__(self, them):
214         if cmp(type(self), type(them)):
215             return cmp(type(self), type(them))
216         if cmp(self.__class__, them.__class__):
217             return cmp(self.__class__, them.__class__)
218         return cmp(self._uri, them._uri)
219
220     def get_verify_cap(self):
221         return IMutableFileURI(self._uri).get_verify_cap()
222
223     def get_repair_cap(self):
224         if self._uri.is_readonly():
225             return None
226         return self._uri
227
228     def _do_serialized(self, cb, *args, **kwargs):
229         # note: to avoid deadlock, this callable is *not* allowed to invoke
230         # other serialized methods within this (or any other)
231         # MutableFileNode. The callable should be a bound method of this same
232         # MFN instance.
233         d = defer.Deferred()
234         self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
235         # we need to put off d.callback until this Deferred is finished being
236         # processed. Otherwise the caller's subsequent activities (like,
237         # doing other things with this node) can cause reentrancy problems in
238         # the Deferred code itself
239         self._serializer.addBoth(lambda res: eventually(d.callback, res))
240         # add a log.err just in case something really weird happens, because
241         # self._serializer stays around forever, therefore we won't see the
242         # usual Unhandled Error in Deferred that would give us a hint.
243         self._serializer.addErrback(log.err)
244         return d
245
246     #################################
247     # ICheckable
248
249     def check(self, monitor, verify=False, add_lease=False):
250         checker = self.checker_class(self, monitor)
251         return checker.check(verify, add_lease)
252
253     def check_and_repair(self, monitor, verify=False, add_lease=False):
254         checker = self.check_and_repairer_class(self, monitor)
255         return checker.check(verify, add_lease)
256
257     #################################
258     # IRepairable
259
260     def repair(self, check_results, force=False):
261         assert ICheckResults(check_results)
262         r = Repairer(self, check_results)
263         d = r.start(force)
264         return d
265
266
267     #################################
268     # IMutableFileNode
269
270     # allow the use of IDownloadTarget
271     def download(self, target):
272         # fake it. TODO: make this cleaner.
273         d = self.download_best_version()
274         def _done(data):
275             target.open(len(data))
276             target.write(data)
277             target.close()
278             return target.finish()
279         d.addCallback(_done)
280         return d
281
282
283     # new API
284
285     def download_best_version(self):
286         return self._do_serialized(self._download_best_version)
287     def _download_best_version(self):
288         servermap = ServerMap()
289         d = self._try_once_to_download_best_version(servermap, MODE_READ)
290         def _maybe_retry(f):
291             f.trap(NotEnoughSharesError)
292             # the download is worth retrying once. Make sure to use the
293             # old servermap, since it is what remembers the bad shares,
294             # but use MODE_WRITE to make it look for even more shares.
295             # TODO: consider allowing this to retry multiple times.. this
296             # approach will let us tolerate about 8 bad shares, I think.
297             return self._try_once_to_download_best_version(servermap,
298                                                            MODE_WRITE)
299         d.addErrback(_maybe_retry)
300         return d
301     def _try_once_to_download_best_version(self, servermap, mode):
302         d = self._update_servermap(servermap, mode)
303         d.addCallback(self._once_updated_download_best_version, servermap)
304         return d
305     def _once_updated_download_best_version(self, ignored, servermap):
306         goal = servermap.best_recoverable_version()
307         if not goal:
308             raise UnrecoverableFileError("no recoverable versions")
309         return self._try_once_to_download_version(servermap, goal)
310
311     def get_size_of_best_version(self):
312         d = self.get_servermap(MODE_READ)
313         def _got_servermap(smap):
314             ver = smap.best_recoverable_version()
315             if not ver:
316                 raise UnrecoverableFileError("no recoverable version")
317             return smap.size_of_version(ver)
318         d.addCallback(_got_servermap)
319         return d
320
321     def overwrite(self, new_contents):
322         return self._do_serialized(self._overwrite, new_contents)
323     def _overwrite(self, new_contents):
324         servermap = ServerMap()
325         d = self._update_servermap(servermap, mode=MODE_WRITE)
326         d.addCallback(lambda ignored: self._upload(new_contents, servermap))
327         return d
328
329
330     def modify(self, modifier, backoffer=None):
331         """I use a modifier callback to apply a change to the mutable file.
332         I implement the following pseudocode::
333
334          obtain_mutable_filenode_lock()
335          first_time = True
336          while True:
337            update_servermap(MODE_WRITE)
338            old = retrieve_best_version()
339            new = modifier(old, servermap, first_time)
340            first_time = False
341            if new == old: break
342            try:
343              publish(new)
344            except UncoordinatedWriteError, e:
345              backoffer(e)
346              continue
347            break
348          release_mutable_filenode_lock()
349
350         The idea is that your modifier function can apply a delta of some
351         sort, and it will be re-run as necessary until it succeeds. The
352         modifier must inspect the old version to see whether its delta has
353         already been applied: if so it should return the contents unmodified.
354
355         Note that the modifier is required to run synchronously, and must not
356         invoke any methods on this MutableFileNode instance.
357
358         The backoff-er is a callable that is responsible for inserting a
359         random delay between subsequent attempts, to help competing updates
360         from colliding forever. It is also allowed to give up after a while.
361         The backoffer is given two arguments: this MutableFileNode, and the
362         Failure object that contains the UncoordinatedWriteError. It should
363         return a Deferred that will fire when the next attempt should be
364         made, or return the Failure if the loop should give up. If
365         backoffer=None, a default one is provided which will perform
366         exponential backoff, and give up after 4 tries. Note that the
367         backoffer should not invoke any methods on this MutableFileNode
368         instance, and it needs to be highly conscious of deadlock issues.
369         """
370         return self._do_serialized(self._modify, modifier, backoffer)
371     def _modify(self, modifier, backoffer):
372         servermap = ServerMap()
373         if backoffer is None:
374             backoffer = BackoffAgent().delay
375         return self._modify_and_retry(servermap, modifier, backoffer, True)
376     def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
377         d = self._modify_once(servermap, modifier, first_time)
378         def _retry(f):
379             f.trap(UncoordinatedWriteError)
380             d2 = defer.maybeDeferred(backoffer, self, f)
381             d2.addCallback(lambda ignored:
382                            self._modify_and_retry(servermap, modifier,
383                                                   backoffer, False))
384             return d2
385         d.addErrback(_retry)
386         return d
387     def _modify_once(self, servermap, modifier, first_time):
388         d = self._update_servermap(servermap, MODE_WRITE)
389         d.addCallback(self._once_updated_download_best_version, servermap)
390         def _apply(old_contents):
391             new_contents = modifier(old_contents, servermap, first_time)
392             if new_contents is None or new_contents == old_contents:
393                 # no changes need to be made
394                 if first_time:
395                     return
396                 # However, since Publish is not automatically doing a
397                 # recovery when it observes UCWE, we need to do a second
398                 # publish. See #551 for details. We'll basically loop until
399                 # we managed an uncontested publish.
400                 new_contents = old_contents
401             precondition(isinstance(new_contents, str),
402                          "Modifier function must return a string or None")
403             return self._upload(new_contents, servermap)
404         d.addCallback(_apply)
405         return d
406
407     def get_servermap(self, mode):
408         return self._do_serialized(self._get_servermap, mode)
409     def _get_servermap(self, mode):
410         servermap = ServerMap()
411         return self._update_servermap(servermap, mode)
412     def _update_servermap(self, servermap, mode):
413         u = ServermapUpdater(self, Monitor(), servermap, mode)
414         history = self._client.get_history()
415         if history:
416             history.notify_mapupdate(u.get_status())
417         return u.update()
418
419     def download_version(self, servermap, version, fetch_privkey=False):
420         return self._do_serialized(self._try_once_to_download_version,
421                                    servermap, version, fetch_privkey)
422     def _try_once_to_download_version(self, servermap, version,
423                                       fetch_privkey=False):
424         r = Retrieve(self, servermap, version, fetch_privkey)
425         history = self._client.get_history()
426         if history:
427             history.notify_retrieve(r.get_status())
428         return r.download()
429
430     def upload(self, new_contents, servermap):
431         return self._do_serialized(self._upload, new_contents, servermap)
432     def _upload(self, new_contents, servermap):
433         assert self._pubkey, "update_servermap must be called before publish"
434         p = Publish(self, servermap)
435         history = self._client.get_history()
436         if history:
437             history.notify_publish(p.get_status(), len(new_contents))
438         return p.publish(new_contents)