3 from twisted.application import service
5 from zope.interface import implements
6 from twisted.internet import defer, reactor
7 from foolscap.eventual import eventually
8 from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
9 ICheckable, ICheckerResults, NotEnoughSharesError
10 from allmydata.util import hashutil, log
11 from allmydata.util.assertutil import precondition
12 from allmydata.uri import WriteableSSKFileURI
13 from allmydata.monitor import Monitor
14 from pycryptopp.publickey import rsa
15 from pycryptopp.cipher.aes import AES
17 from publish import Publish
18 from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
19 ResponseCache, UncoordinatedWriteError
20 from servermap import ServerMap, ServermapUpdater
21 from retrieve import Retrieve
22 from checker import MutableChecker, MutableCheckAndRepairer
23 from repairer import Repairer
27 # these parameters are copied from foolscap.reconnector, which gets them
28 # from twisted.internet.protocol.ReconnectingClientFactory
30 factor = 2.7182818284590451 # (math.e)
31 jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
35 self._delay = self.initialDelay
37 def delay(self, node, f):
41 self._delay = self._delay * self.factor
42 self._delay = random.normalvariate(self._delay,
43 self._delay * self.jitter)
45 reactor.callLater(self._delay, d.callback, None)
48 # use client.create_mutable_file() to make one of these
50 class MutableFileNode:
51 implements(IMutableFileNode, ICheckable)
52 SIGNATURE_KEY_SIZE = 2048
53 checker_class = MutableChecker
54 check_and_repairer_class = MutableCheckAndRepairer
56 def __init__(self, client):
58 self._pubkey = None # filled in upon first read
59 self._privkey = None # filled in if we're mutable
60 # we keep track of the last encoding parameters that we use. These
61 # are updated upon retrieve, and used by publish. If we publish
62 # without ever reading (i.e. overwrite()), then we use these values.
63 defaults = client.get_encoding_parameters()
64 self._required_shares = defaults["k"]
65 self._total_shares = defaults["n"]
66 self._sharemap = {} # known shares, shnum-to-[nodeids]
67 self._cache = ResponseCache()
69 # all users of this MutableFileNode go through the serializer. This
70 # takes advantage of the fact that Deferreds discard the callbacks
71 # that they're done with, so we can keep using the same Deferred
72 # forever without consuming more and more memory.
73 self._serializer = defer.succeed(None)
76 if hasattr(self, '_uri'):
77 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
79 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
81 def init_from_uri(self, myuri):
82 # we have the URI, but we have not yet retrieved the public
83 # verification key, nor things like 'k' or 'N'. If and when someone
84 # wants to get our contents, we'll pull from shares and fill those
86 self._uri = IMutableFileURI(myuri)
87 if not self._uri.is_readonly():
88 self._writekey = self._uri.writekey
89 self._readkey = self._uri.readkey
90 self._storage_index = self._uri.storage_index
91 self._fingerprint = self._uri.fingerprint
92 # the following values are learned during Retrieval
94 # self._required_shares
96 # and these are needed for Publish. They are filled in by Retrieval
97 # if possible, otherwise by the first peer that Publish talks to.
99 self._encprivkey = None
102 def create(self, initial_contents, keypair_generator=None):
103 """Call this when the filenode is first created. This will generate
104 the keys, generate the initial shares, wait until at least numpeers
105 are connected, allocate shares, and upload the initial
106 contents. Returns a Deferred that fires (with the MutableFileNode
107 instance you should use) when it completes.
110 d = defer.maybeDeferred(self._generate_pubprivkeys, keypair_generator)
111 d.addCallback(self._generated)
112 d.addCallback(lambda res: self._upload(initial_contents, None))
115 def _generated(self, (pubkey, privkey) ):
116 self._pubkey, self._privkey = pubkey, privkey
117 pubkey_s = self._pubkey.serialize()
118 privkey_s = self._privkey.serialize()
119 self._writekey = hashutil.ssk_writekey_hash(privkey_s)
120 self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
121 self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
122 self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
123 self._readkey = self._uri.readkey
124 self._storage_index = self._uri.storage_index
126 def _generate_pubprivkeys(self, keypair_generator):
127 if keypair_generator:
128 return keypair_generator(self.SIGNATURE_KEY_SIZE)
130 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2 secs
131 signer = rsa.generate(self.SIGNATURE_KEY_SIZE)
132 verifier = signer.get_verifying_key()
133 return verifier, signer
135 def _encrypt_privkey(self, writekey, privkey):
137 crypttext = enc.process(privkey)
140 def _decrypt_privkey(self, enc_privkey):
141 enc = AES(self._writekey)
142 privkey = enc.process(enc_privkey)
145 def _populate_pubkey(self, pubkey):
146 self._pubkey = pubkey
147 def _populate_required_shares(self, required_shares):
148 self._required_shares = required_shares
149 def _populate_total_shares(self, total_shares):
150 self._total_shares = total_shares
152 def _populate_privkey(self, privkey):
153 self._privkey = privkey
154 def _populate_encprivkey(self, encprivkey):
155 self._encprivkey = encprivkey
158 def get_write_enabler(self, peerid):
159 assert len(peerid) == 20
160 return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
161 def get_renewal_secret(self, peerid):
162 assert len(peerid) == 20
163 crs = self._client.get_renewal_secret()
164 frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
165 return hashutil.bucket_renewal_secret_hash(frs, peerid)
166 def get_cancel_secret(self, peerid):
167 assert len(peerid) == 20
168 ccs = self._client.get_cancel_secret()
169 fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
170 return hashutil.bucket_cancel_secret_hash(fcs, peerid)
172 def get_writekey(self):
173 return self._writekey
174 def get_readkey(self):
176 def get_storage_index(self):
177 return self._storage_index
178 def get_privkey(self):
180 def get_encprivkey(self):
181 return self._encprivkey
182 def get_pubkey(self):
185 def get_required_shares(self):
186 return self._required_shares
187 def get_total_shares(self):
188 return self._total_shares
190 ####################################
194 return self._uri.to_string()
196 return "?" # TODO: this is likely to cause problems, not being an int
197 def get_readonly(self):
198 if self.is_readonly():
200 ro = MutableFileNode(self._client)
201 ro.init_from_uri(self._uri.get_readonly())
204 def get_readonly_uri(self):
205 return self._uri.get_readonly().to_string()
207 def is_mutable(self):
208 return self._uri.is_mutable()
209 def is_readonly(self):
210 return self._uri.is_readonly()
213 return hash((self.__class__, self._uri))
214 def __cmp__(self, them):
215 if cmp(type(self), type(them)):
216 return cmp(type(self), type(them))
217 if cmp(self.__class__, them.__class__):
218 return cmp(self.__class__, them.__class__)
219 return cmp(self._uri, them._uri)
221 def get_verify_cap(self):
222 return IMutableFileURI(self._uri).get_verify_cap()
224 def _do_serialized(self, cb, *args, **kwargs):
225 # note: to avoid deadlock, this callable is *not* allowed to invoke
226 # other serialized methods within this (or any other)
227 # MutableFileNode. The callable should be a bound method of this same
230 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
231 # we need to put off d.callback until this Deferred is finished being
232 # processed. Otherwise the caller's subsequent activities (like,
233 # doing other things with this node) can cause reentrancy problems in
234 # the Deferred code itself
235 self._serializer.addBoth(lambda res: eventually(d.callback, res))
236 # add a log.err just in case something really weird happens, because
237 # self._serializer stays around forever, therefore we won't see the
238 # usual Unhandled Error in Deferred that would give us a hint.
239 self._serializer.addErrback(log.err)
242 #################################
245 def check(self, monitor, verify=False):
246 checker = self.checker_class(self, monitor)
247 return checker.check(verify)
249 def check_and_repair(self, monitor, verify=False):
250 checker = self.check_and_repairer_class(self, monitor)
251 return checker.check(verify)
253 #################################
256 def repair(self, checker_results, force=False):
257 assert ICheckerResults(checker_results)
258 r = Repairer(self, checker_results)
263 #################################
266 # allow the use of IDownloadTarget
267 def download(self, target):
268 # fake it. TODO: make this cleaner.
269 d = self.download_best_version()
271 target.open(len(data))
274 return target.finish()
281 def download_best_version(self):
282 return self._do_serialized(self._download_best_version)
283 def _download_best_version(self):
284 servermap = ServerMap()
285 d = self._try_once_to_download_best_version(servermap, MODE_READ)
287 f.trap(NotEnoughSharesError)
288 # the download is worth retrying once. Make sure to use the
289 # old servermap, since it is what remembers the bad shares,
290 # but use MODE_WRITE to make it look for even more shares.
291 # TODO: consider allowing this to retry multiple times.. this
292 # approach will let us tolerate about 8 bad shares, I think.
293 return self._try_once_to_download_best_version(servermap,
295 d.addErrback(_maybe_retry)
297 def _try_once_to_download_best_version(self, servermap, mode):
298 d = self._update_servermap(servermap, mode)
299 d.addCallback(self._once_updated_download_best_version, servermap)
301 def _once_updated_download_best_version(self, ignored, servermap):
302 goal = servermap.best_recoverable_version()
304 raise UnrecoverableFileError("no recoverable versions")
305 return self._try_once_to_download_version(servermap, goal)
307 def get_size_of_best_version(self):
308 d = self.get_servermap(MODE_READ)
309 def _got_servermap(smap):
310 ver = smap.best_recoverable_version()
312 raise UnrecoverableFileError("no recoverable version")
313 return smap.size_of_version(ver)
314 d.addCallback(_got_servermap)
317 def overwrite(self, new_contents):
318 return self._do_serialized(self._overwrite, new_contents)
319 def _overwrite(self, new_contents):
320 servermap = ServerMap()
321 d = self._update_servermap(servermap, mode=MODE_WRITE)
322 d.addCallback(lambda ignored: self._upload(new_contents, servermap))
326 def modify(self, modifier, backoffer=None):
327 """I use a modifier callback to apply a change to the mutable file.
328 I implement the following pseudocode::
330 obtain_mutable_filenode_lock()
333 update_servermap(MODE_WRITE)
334 old = retrieve_best_version()
335 new = modifier(old, servermap, first_time)
340 except UncoordinatedWriteError, e:
344 release_mutable_filenode_lock()
346 The idea is that your modifier function can apply a delta of some
347 sort, and it will be re-run as necessary until it succeeds. The
348 modifier must inspect the old version to see whether its delta has
349 already been applied: if so it should return the contents unmodified.
351 Note that the modifier is required to run synchronously, and must not
352 invoke any methods on this MutableFileNode instance.
354 The backoff-er is a callable that is responsible for inserting a
355 random delay between subsequent attempts, to help competing updates
356 from colliding forever. It is also allowed to give up after a while.
357 The backoffer is given two arguments: this MutableFileNode, and the
358 Failure object that contains the UncoordinatedWriteError. It should
359 return a Deferred that will fire when the next attempt should be
360 made, or return the Failure if the loop should give up. If
361 backoffer=None, a default one is provided which will perform
362 exponential backoff, and give up after 4 tries. Note that the
363 backoffer should not invoke any methods on this MutableFileNode
364 instance, and it needs to be highly conscious of deadlock issues.
366 return self._do_serialized(self._modify, modifier, backoffer)
367 def _modify(self, modifier, backoffer):
368 servermap = ServerMap()
369 if backoffer is None:
370 backoffer = BackoffAgent().delay
371 return self._modify_and_retry(servermap, modifier, backoffer, True)
372 def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
373 d = self._modify_once(servermap, modifier, first_time)
375 f.trap(UncoordinatedWriteError)
376 d2 = defer.maybeDeferred(backoffer, self, f)
377 d2.addCallback(lambda ignored:
378 self._modify_and_retry(servermap, modifier,
383 def _modify_once(self, servermap, modifier, first_time):
384 d = self._update_servermap(servermap, MODE_WRITE)
385 d.addCallback(self._once_updated_download_best_version, servermap)
386 def _apply(old_contents):
387 new_contents = modifier(old_contents, servermap, first_time)
388 if new_contents is None or new_contents == old_contents:
389 # no changes need to be made
392 # However, since Publish is not automatically doing a
393 # recovery when it observes UCWE, we need to do a second
394 # publish. See #551 for details. We'll basically loop until
395 # we managed an uncontested publish.
396 new_contents = old_contents
397 precondition(isinstance(new_contents, str),
398 "Modifier function must return a string or None")
399 return self._upload(new_contents, servermap)
400 d.addCallback(_apply)
403 def get_servermap(self, mode):
404 return self._do_serialized(self._get_servermap, mode)
405 def _get_servermap(self, mode):
406 servermap = ServerMap()
407 return self._update_servermap(servermap, mode)
408 def _update_servermap(self, servermap, mode):
409 u = ServermapUpdater(self, Monitor(), servermap, mode)
410 self._client.notify_mapupdate(u.get_status())
413 def download_version(self, servermap, version, fetch_privkey=False):
414 return self._do_serialized(self._try_once_to_download_version,
415 servermap, version, fetch_privkey)
416 def _try_once_to_download_version(self, servermap, version,
417 fetch_privkey=False):
418 r = Retrieve(self, servermap, version, fetch_privkey)
419 self._client.notify_retrieve(r.get_status())
422 def upload(self, new_contents, servermap):
423 return self._do_serialized(self._upload, new_contents, servermap)
424 def _upload(self, new_contents, servermap):
425 assert self._pubkey, "update_servermap must be called before publish"
426 p = Publish(self, servermap)
427 self._client.notify_publish(p.get_status(), len(new_contents))
428 return p.publish(new_contents)
433 class MutableWatcher(service.MultiService):
434 MAX_MAPUPDATE_STATUSES = 20
435 MAX_PUBLISH_STATUSES = 20
436 MAX_RETRIEVE_STATUSES = 20
437 name = "mutable-watcher"
439 def __init__(self, stats_provider=None):
440 service.MultiService.__init__(self)
441 self.stats_provider = stats_provider
442 self._all_mapupdate_status = weakref.WeakKeyDictionary()
443 self._recent_mapupdate_status = []
444 self._all_publish_status = weakref.WeakKeyDictionary()
445 self._recent_publish_status = []
446 self._all_retrieve_status = weakref.WeakKeyDictionary()
447 self._recent_retrieve_status = []
450 def notify_mapupdate(self, p):
451 self._all_mapupdate_status[p] = None
452 self._recent_mapupdate_status.append(p)
453 while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES:
454 self._recent_mapupdate_status.pop(0)
456 def notify_publish(self, p, size):
457 self._all_publish_status[p] = None
458 self._recent_publish_status.append(p)
459 if self.stats_provider:
460 self.stats_provider.count('mutable.files_published', 1)
461 # We must be told bytes_published as an argument, since the
462 # publish_status does not yet know how much data it will be asked
463 # to send. When we move to MDMF we'll need to find a better way
465 self.stats_provider.count('mutable.bytes_published', size)
466 while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
467 self._recent_publish_status.pop(0)
469 def notify_retrieve(self, r):
470 self._all_retrieve_status[r] = None
471 self._recent_retrieve_status.append(r)
472 if self.stats_provider:
473 self.stats_provider.count('mutable.files_retrieved', 1)
474 self.stats_provider.count('mutable.bytes_retrieved', r.get_size())
475 while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
476 self._recent_retrieve_status.pop(0)
479 def list_all_mapupdate_statuses(self):
480 return self._all_mapupdate_status.keys()
481 def list_all_publish_statuses(self):
482 return self._all_publish_status.keys()
483 def list_all_retrieve_statuses(self):
484 return self._all_retrieve_status.keys()