]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/filenode.py
rename "get_verifier()" to "get_verify_cap()"
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / filenode.py
1
2 import weakref, random
3 from twisted.application import service
4
5 from zope.interface import implements
6 from twisted.internet import defer, reactor
7 from foolscap.eventual import eventually
8 from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
9      ICheckable, ICheckerResults, NotEnoughSharesError
10 from allmydata.util import hashutil, log
11 from allmydata.util.assertutil import precondition
12 from allmydata.uri import WriteableSSKFileURI
13 from allmydata.monitor import Monitor
14 from pycryptopp.publickey import rsa
15 from pycryptopp.cipher.aes import AES
16
17 from publish import Publish
18 from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
19      ResponseCache, UncoordinatedWriteError
20 from servermap import ServerMap, ServermapUpdater
21 from retrieve import Retrieve
22 from checker import MutableChecker, MutableCheckAndRepairer
23 from repairer import Repairer
24
25
26 class BackoffAgent:
27     # these parameters are copied from foolscap.reconnector, which gets them
28     # from twisted.internet.protocol.ReconnectingClientFactory
29     initialDelay = 1.0
30     factor = 2.7182818284590451 # (math.e)
31     jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
32     maxRetries = 4
33
34     def __init__(self):
35         self._delay = self.initialDelay
36         self._count = 0
37     def delay(self, node, f):
38         self._count += 1
39         if self._count == 4:
40             return f
41         self._delay = self._delay * self.factor
42         self._delay = random.normalvariate(self._delay,
43                                            self._delay * self.jitter)
44         d = defer.Deferred()
45         reactor.callLater(self._delay, d.callback, None)
46         return d
47
48 # use client.create_mutable_file() to make one of these
49
50 class MutableFileNode:
51     implements(IMutableFileNode, ICheckable)
52     SIGNATURE_KEY_SIZE = 2048
53     checker_class = MutableChecker
54     check_and_repairer_class = MutableCheckAndRepairer
55
56     def __init__(self, client):
57         self._client = client
58         self._pubkey = None # filled in upon first read
59         self._privkey = None # filled in if we're mutable
60         # we keep track of the last encoding parameters that we use. These
61         # are updated upon retrieve, and used by publish. If we publish
62         # without ever reading (i.e. overwrite()), then we use these values.
63         defaults = client.get_encoding_parameters()
64         self._required_shares = defaults["k"]
65         self._total_shares = defaults["n"]
66         self._sharemap = {} # known shares, shnum-to-[nodeids]
67         self._cache = ResponseCache()
68
69         # all users of this MutableFileNode go through the serializer. This
70         # takes advantage of the fact that Deferreds discard the callbacks
71         # that they're done with, so we can keep using the same Deferred
72         # forever without consuming more and more memory.
73         self._serializer = defer.succeed(None)
74
75     def __repr__(self):
76         if hasattr(self, '_uri'):
77             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
78         else:
79             return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
80
81     def init_from_uri(self, myuri):
82         # we have the URI, but we have not yet retrieved the public
83         # verification key, nor things like 'k' or 'N'. If and when someone
84         # wants to get our contents, we'll pull from shares and fill those
85         # in.
86         self._uri = IMutableFileURI(myuri)
87         if not self._uri.is_readonly():
88             self._writekey = self._uri.writekey
89         self._readkey = self._uri.readkey
90         self._storage_index = self._uri.storage_index
91         self._fingerprint = self._uri.fingerprint
92         # the following values are learned during Retrieval
93         #  self._pubkey
94         #  self._required_shares
95         #  self._total_shares
96         # and these are needed for Publish. They are filled in by Retrieval
97         # if possible, otherwise by the first peer that Publish talks to.
98         self._privkey = None
99         self._encprivkey = None
100         return self
101
102     def create(self, initial_contents, keypair_generator=None):
103         """Call this when the filenode is first created. This will generate
104         the keys, generate the initial shares, wait until at least numpeers
105         are connected, allocate shares, and upload the initial
106         contents. Returns a Deferred that fires (with the MutableFileNode
107         instance you should use) when it completes.
108         """
109
110         d = defer.maybeDeferred(self._generate_pubprivkeys, keypair_generator)
111         d.addCallback(self._generated)
112         d.addCallback(lambda res: self._upload(initial_contents, None))
113         return d
114
115     def _generated(self, (pubkey, privkey) ):
116         self._pubkey, self._privkey = pubkey, privkey
117         pubkey_s = self._pubkey.serialize()
118         privkey_s = self._privkey.serialize()
119         self._writekey = hashutil.ssk_writekey_hash(privkey_s)
120         self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
121         self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
122         self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
123         self._readkey = self._uri.readkey
124         self._storage_index = self._uri.storage_index
125
126     def _generate_pubprivkeys(self, keypair_generator):
127         if keypair_generator:
128             return keypair_generator(self.SIGNATURE_KEY_SIZE)
129         else:
130             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2 secs
131             signer = rsa.generate(self.SIGNATURE_KEY_SIZE)
132             verifier = signer.get_verifying_key()
133             return verifier, signer
134
135     def _encrypt_privkey(self, writekey, privkey):
136         enc = AES(writekey)
137         crypttext = enc.process(privkey)
138         return crypttext
139
140     def _decrypt_privkey(self, enc_privkey):
141         enc = AES(self._writekey)
142         privkey = enc.process(enc_privkey)
143         return privkey
144
145     def _populate_pubkey(self, pubkey):
146         self._pubkey = pubkey
147     def _populate_required_shares(self, required_shares):
148         self._required_shares = required_shares
149     def _populate_total_shares(self, total_shares):
150         self._total_shares = total_shares
151
152     def _populate_privkey(self, privkey):
153         self._privkey = privkey
154     def _populate_encprivkey(self, encprivkey):
155         self._encprivkey = encprivkey
156
157
158     def get_write_enabler(self, peerid):
159         assert len(peerid) == 20
160         return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
161     def get_renewal_secret(self, peerid):
162         assert len(peerid) == 20
163         crs = self._client.get_renewal_secret()
164         frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
165         return hashutil.bucket_renewal_secret_hash(frs, peerid)
166     def get_cancel_secret(self, peerid):
167         assert len(peerid) == 20
168         ccs = self._client.get_cancel_secret()
169         fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
170         return hashutil.bucket_cancel_secret_hash(fcs, peerid)
171
172     def get_writekey(self):
173         return self._writekey
174     def get_readkey(self):
175         return self._readkey
176     def get_storage_index(self):
177         return self._storage_index
178     def get_privkey(self):
179         return self._privkey
180     def get_encprivkey(self):
181         return self._encprivkey
182     def get_pubkey(self):
183         return self._pubkey
184
185     def get_required_shares(self):
186         return self._required_shares
187     def get_total_shares(self):
188         return self._total_shares
189
190     ####################################
191     # IFilesystemNode
192
193     def get_uri(self):
194         return self._uri.to_string()
195     def get_size(self):
196         return "?" # TODO: this is likely to cause problems, not being an int
197     def get_readonly(self):
198         if self.is_readonly():
199             return self
200         ro = MutableFileNode(self._client)
201         ro.init_from_uri(self._uri.get_readonly())
202         return ro
203
204     def get_readonly_uri(self):
205         return self._uri.get_readonly().to_string()
206
207     def is_mutable(self):
208         return self._uri.is_mutable()
209     def is_readonly(self):
210         return self._uri.is_readonly()
211
212     def __hash__(self):
213         return hash((self.__class__, self._uri))
214     def __cmp__(self, them):
215         if cmp(type(self), type(them)):
216             return cmp(type(self), type(them))
217         if cmp(self.__class__, them.__class__):
218             return cmp(self.__class__, them.__class__)
219         return cmp(self._uri, them._uri)
220
221     def get_verify_cap(self):
222         return IMutableFileURI(self._uri).get_verify_cap()
223
224     def _do_serialized(self, cb, *args, **kwargs):
225         # note: to avoid deadlock, this callable is *not* allowed to invoke
226         # other serialized methods within this (or any other)
227         # MutableFileNode. The callable should be a bound method of this same
228         # MFN instance.
229         d = defer.Deferred()
230         self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
231         # we need to put off d.callback until this Deferred is finished being
232         # processed. Otherwise the caller's subsequent activities (like,
233         # doing other things with this node) can cause reentrancy problems in
234         # the Deferred code itself
235         self._serializer.addBoth(lambda res: eventually(d.callback, res))
236         # add a log.err just in case something really weird happens, because
237         # self._serializer stays around forever, therefore we won't see the
238         # usual Unhandled Error in Deferred that would give us a hint.
239         self._serializer.addErrback(log.err)
240         return d
241
242     #################################
243     # ICheckable
244
245     def check(self, monitor, verify=False):
246         checker = self.checker_class(self, monitor)
247         return checker.check(verify)
248
249     def check_and_repair(self, monitor, verify=False):
250         checker = self.check_and_repairer_class(self, monitor)
251         return checker.check(verify)
252
253     #################################
254     # IRepairable
255
256     def repair(self, checker_results, force=False):
257         assert ICheckerResults(checker_results)
258         r = Repairer(self, checker_results)
259         d = r.start(force)
260         return d
261
262
263     #################################
264     # IMutableFileNode
265
266     # allow the use of IDownloadTarget
267     def download(self, target):
268         # fake it. TODO: make this cleaner.
269         d = self.download_best_version()
270         def _done(data):
271             target.open(len(data))
272             target.write(data)
273             target.close()
274             return target.finish()
275         d.addCallback(_done)
276         return d
277
278
279     # new API
280
281     def download_best_version(self):
282         return self._do_serialized(self._download_best_version)
283     def _download_best_version(self):
284         servermap = ServerMap()
285         d = self._try_once_to_download_best_version(servermap, MODE_READ)
286         def _maybe_retry(f):
287             f.trap(NotEnoughSharesError)
288             # the download is worth retrying once. Make sure to use the
289             # old servermap, since it is what remembers the bad shares,
290             # but use MODE_WRITE to make it look for even more shares.
291             # TODO: consider allowing this to retry multiple times.. this
292             # approach will let us tolerate about 8 bad shares, I think.
293             return self._try_once_to_download_best_version(servermap,
294                                                            MODE_WRITE)
295         d.addErrback(_maybe_retry)
296         return d
297     def _try_once_to_download_best_version(self, servermap, mode):
298         d = self._update_servermap(servermap, mode)
299         d.addCallback(self._once_updated_download_best_version, servermap)
300         return d
301     def _once_updated_download_best_version(self, ignored, servermap):
302         goal = servermap.best_recoverable_version()
303         if not goal:
304             raise UnrecoverableFileError("no recoverable versions")
305         return self._try_once_to_download_version(servermap, goal)
306
307     def get_size_of_best_version(self):
308         d = self.get_servermap(MODE_READ)
309         def _got_servermap(smap):
310             ver = smap.best_recoverable_version()
311             if not ver:
312                 raise UnrecoverableFileError("no recoverable version")
313             return smap.size_of_version(ver)
314         d.addCallback(_got_servermap)
315         return d
316
317     def overwrite(self, new_contents):
318         return self._do_serialized(self._overwrite, new_contents)
319     def _overwrite(self, new_contents):
320         servermap = ServerMap()
321         d = self._update_servermap(servermap, mode=MODE_WRITE)
322         d.addCallback(lambda ignored: self._upload(new_contents, servermap))
323         return d
324
325
326     def modify(self, modifier, backoffer=None):
327         """I use a modifier callback to apply a change to the mutable file.
328         I implement the following pseudocode::
329
330          obtain_mutable_filenode_lock()
331          first_time = True
332          while True:
333            update_servermap(MODE_WRITE)
334            old = retrieve_best_version()
335            new = modifier(old, servermap, first_time)
336            first_time = False
337            if new == old: break
338            try:
339              publish(new)
340            except UncoordinatedWriteError, e:
341              backoffer(e)
342              continue
343            break
344          release_mutable_filenode_lock()
345
346         The idea is that your modifier function can apply a delta of some
347         sort, and it will be re-run as necessary until it succeeds. The
348         modifier must inspect the old version to see whether its delta has
349         already been applied: if so it should return the contents unmodified.
350
351         Note that the modifier is required to run synchronously, and must not
352         invoke any methods on this MutableFileNode instance.
353
354         The backoff-er is a callable that is responsible for inserting a
355         random delay between subsequent attempts, to help competing updates
356         from colliding forever. It is also allowed to give up after a while.
357         The backoffer is given two arguments: this MutableFileNode, and the
358         Failure object that contains the UncoordinatedWriteError. It should
359         return a Deferred that will fire when the next attempt should be
360         made, or return the Failure if the loop should give up. If
361         backoffer=None, a default one is provided which will perform
362         exponential backoff, and give up after 4 tries. Note that the
363         backoffer should not invoke any methods on this MutableFileNode
364         instance, and it needs to be highly conscious of deadlock issues.
365         """
366         return self._do_serialized(self._modify, modifier, backoffer)
367     def _modify(self, modifier, backoffer):
368         servermap = ServerMap()
369         if backoffer is None:
370             backoffer = BackoffAgent().delay
371         return self._modify_and_retry(servermap, modifier, backoffer, True)
372     def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
373         d = self._modify_once(servermap, modifier, first_time)
374         def _retry(f):
375             f.trap(UncoordinatedWriteError)
376             d2 = defer.maybeDeferred(backoffer, self, f)
377             d2.addCallback(lambda ignored:
378                            self._modify_and_retry(servermap, modifier,
379                                                   backoffer, False))
380             return d2
381         d.addErrback(_retry)
382         return d
383     def _modify_once(self, servermap, modifier, first_time):
384         d = self._update_servermap(servermap, MODE_WRITE)
385         d.addCallback(self._once_updated_download_best_version, servermap)
386         def _apply(old_contents):
387             new_contents = modifier(old_contents, servermap, first_time)
388             if new_contents is None or new_contents == old_contents:
389                 # no changes need to be made
390                 if first_time:
391                     return
392                 # However, since Publish is not automatically doing a
393                 # recovery when it observes UCWE, we need to do a second
394                 # publish. See #551 for details. We'll basically loop until
395                 # we managed an uncontested publish.
396                 new_contents = old_contents
397             precondition(isinstance(new_contents, str),
398                          "Modifier function must return a string or None")
399             return self._upload(new_contents, servermap)
400         d.addCallback(_apply)
401         return d
402
403     def get_servermap(self, mode):
404         return self._do_serialized(self._get_servermap, mode)
405     def _get_servermap(self, mode):
406         servermap = ServerMap()
407         return self._update_servermap(servermap, mode)
408     def _update_servermap(self, servermap, mode):
409         u = ServermapUpdater(self, Monitor(), servermap, mode)
410         self._client.notify_mapupdate(u.get_status())
411         return u.update()
412
413     def download_version(self, servermap, version, fetch_privkey=False):
414         return self._do_serialized(self._try_once_to_download_version,
415                                    servermap, version, fetch_privkey)
416     def _try_once_to_download_version(self, servermap, version,
417                                       fetch_privkey=False):
418         r = Retrieve(self, servermap, version, fetch_privkey)
419         self._client.notify_retrieve(r.get_status())
420         return r.download()
421
422     def upload(self, new_contents, servermap):
423         return self._do_serialized(self._upload, new_contents, servermap)
424     def _upload(self, new_contents, servermap):
425         assert self._pubkey, "update_servermap must be called before publish"
426         p = Publish(self, servermap)
427         self._client.notify_publish(p.get_status(), len(new_contents))
428         return p.publish(new_contents)
429
430
431
432
433 class MutableWatcher(service.MultiService):
434     MAX_MAPUPDATE_STATUSES = 20
435     MAX_PUBLISH_STATUSES = 20
436     MAX_RETRIEVE_STATUSES = 20
437     name = "mutable-watcher"
438
439     def __init__(self, stats_provider=None):
440         service.MultiService.__init__(self)
441         self.stats_provider = stats_provider
442         self._all_mapupdate_status = weakref.WeakKeyDictionary()
443         self._recent_mapupdate_status = []
444         self._all_publish_status = weakref.WeakKeyDictionary()
445         self._recent_publish_status = []
446         self._all_retrieve_status = weakref.WeakKeyDictionary()
447         self._recent_retrieve_status = []
448
449
450     def notify_mapupdate(self, p):
451         self._all_mapupdate_status[p] = None
452         self._recent_mapupdate_status.append(p)
453         while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES:
454             self._recent_mapupdate_status.pop(0)
455
456     def notify_publish(self, p, size):
457         self._all_publish_status[p] = None
458         self._recent_publish_status.append(p)
459         if self.stats_provider:
460             self.stats_provider.count('mutable.files_published', 1)
461             # We must be told bytes_published as an argument, since the
462             # publish_status does not yet know how much data it will be asked
463             # to send. When we move to MDMF we'll need to find a better way
464             # to handle this.
465             self.stats_provider.count('mutable.bytes_published', size)
466         while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
467             self._recent_publish_status.pop(0)
468
469     def notify_retrieve(self, r):
470         self._all_retrieve_status[r] = None
471         self._recent_retrieve_status.append(r)
472         if self.stats_provider:
473             self.stats_provider.count('mutable.files_retrieved', 1)
474             self.stats_provider.count('mutable.bytes_retrieved', r.get_size())
475         while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
476             self._recent_retrieve_status.pop(0)
477
478
479     def list_all_mapupdate_statuses(self):
480         return self._all_mapupdate_status.keys()
481     def list_all_publish_statuses(self):
482         return self._all_publish_status.keys()
483     def list_all_retrieve_statuses(self):
484         return self._all_retrieve_status.keys()