4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
8 ICheckable, ICheckResults, NotEnoughSharesError
9 from allmydata.util import hashutil, log
10 from allmydata.util.assertutil import precondition
11 from allmydata.uri import WriteableSSKFileURI
12 from allmydata.monitor import Monitor
13 from pycryptopp.publickey import rsa
14 from pycryptopp.cipher.aes import AES
16 from publish import Publish
17 from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
18 ResponseCache, UncoordinatedWriteError
19 from servermap import ServerMap, ServermapUpdater
20 from retrieve import Retrieve
21 from checker import MutableChecker, MutableCheckAndRepairer
22 from repairer import Repairer
26 # these parameters are copied from foolscap.reconnector, which gets them
27 # from twisted.internet.protocol.ReconnectingClientFactory
29 factor = 2.7182818284590451 # (math.e)
30 jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
34 self._delay = self.initialDelay
36 def delay(self, node, f):
40 self._delay = self._delay * self.factor
41 self._delay = random.normalvariate(self._delay,
42 self._delay * self.jitter)
44 reactor.callLater(self._delay, d.callback, None)
47 # use client.create_mutable_file() to make one of these
49 class MutableFileNode:
50 implements(IMutableFileNode, ICheckable)
51 SIGNATURE_KEY_SIZE = 2048
52 checker_class = MutableChecker
53 check_and_repairer_class = MutableCheckAndRepairer
55 def __init__(self, client):
57 self._pubkey = None # filled in upon first read
58 self._privkey = None # filled in if we're mutable
59 # we keep track of the last encoding parameters that we use. These
60 # are updated upon retrieve, and used by publish. If we publish
61 # without ever reading (i.e. overwrite()), then we use these values.
62 defaults = client.get_encoding_parameters()
63 self._required_shares = defaults["k"]
64 self._total_shares = defaults["n"]
65 self._sharemap = {} # known shares, shnum-to-[nodeids]
66 self._cache = ResponseCache()
68 # all users of this MutableFileNode go through the serializer. This
69 # takes advantage of the fact that Deferreds discard the callbacks
70 # that they're done with, so we can keep using the same Deferred
71 # forever without consuming more and more memory.
72 self._serializer = defer.succeed(None)
75 if hasattr(self, '_uri'):
76 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
78 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
80 def init_from_uri(self, myuri):
81 # we have the URI, but we have not yet retrieved the public
82 # verification key, nor things like 'k' or 'N'. If and when someone
83 # wants to get our contents, we'll pull from shares and fill those
85 self._uri = IMutableFileURI(myuri)
86 if not self._uri.is_readonly():
87 self._writekey = self._uri.writekey
88 self._readkey = self._uri.readkey
89 self._storage_index = self._uri.storage_index
90 self._fingerprint = self._uri.fingerprint
91 # the following values are learned during Retrieval
93 # self._required_shares
95 # and these are needed for Publish. They are filled in by Retrieval
96 # if possible, otherwise by the first peer that Publish talks to.
98 self._encprivkey = None
101 def create(self, initial_contents, keypair_generator=None):
102 """Call this when the filenode is first created. This will generate
103 the keys, generate the initial shares, wait until at least numpeers
104 are connected, allocate shares, and upload the initial
105 contents. Returns a Deferred that fires (with the MutableFileNode
106 instance you should use) when it completes.
109 d = defer.maybeDeferred(self._generate_pubprivkeys, keypair_generator)
110 d.addCallback(self._generated)
111 d.addCallback(lambda res: self._upload(initial_contents, None))
114 def _generated(self, (pubkey, privkey) ):
115 self._pubkey, self._privkey = pubkey, privkey
116 pubkey_s = self._pubkey.serialize()
117 privkey_s = self._privkey.serialize()
118 self._writekey = hashutil.ssk_writekey_hash(privkey_s)
119 self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
120 self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
121 self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
122 self._readkey = self._uri.readkey
123 self._storage_index = self._uri.storage_index
125 def _generate_pubprivkeys(self, keypair_generator):
126 if keypair_generator:
127 return keypair_generator(self.SIGNATURE_KEY_SIZE)
129 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2 secs
130 signer = rsa.generate(self.SIGNATURE_KEY_SIZE)
131 verifier = signer.get_verifying_key()
132 return verifier, signer
134 def _encrypt_privkey(self, writekey, privkey):
136 crypttext = enc.process(privkey)
139 def _decrypt_privkey(self, enc_privkey):
140 enc = AES(self._writekey)
141 privkey = enc.process(enc_privkey)
144 def _populate_pubkey(self, pubkey):
145 self._pubkey = pubkey
146 def _populate_required_shares(self, required_shares):
147 self._required_shares = required_shares
148 def _populate_total_shares(self, total_shares):
149 self._total_shares = total_shares
151 def _populate_privkey(self, privkey):
152 self._privkey = privkey
153 def _populate_encprivkey(self, encprivkey):
154 self._encprivkey = encprivkey
157 def get_write_enabler(self, peerid):
158 assert len(peerid) == 20
159 return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
160 def get_renewal_secret(self, peerid):
161 assert len(peerid) == 20
162 crs = self._client.get_renewal_secret()
163 frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
164 return hashutil.bucket_renewal_secret_hash(frs, peerid)
165 def get_cancel_secret(self, peerid):
166 assert len(peerid) == 20
167 ccs = self._client.get_cancel_secret()
168 fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
169 return hashutil.bucket_cancel_secret_hash(fcs, peerid)
171 def get_writekey(self):
172 return self._writekey
173 def get_readkey(self):
175 def get_storage_index(self):
176 return self._storage_index
177 def get_privkey(self):
179 def get_encprivkey(self):
180 return self._encprivkey
181 def get_pubkey(self):
184 def get_required_shares(self):
185 return self._required_shares
186 def get_total_shares(self):
187 return self._total_shares
189 ####################################
193 return self._uri.to_string()
195 return "?" # TODO: this is likely to cause problems, not being an int
196 def get_readonly(self):
197 if self.is_readonly():
199 ro = MutableFileNode(self._client)
200 ro.init_from_uri(self._uri.get_readonly())
203 def get_readonly_uri(self):
204 return self._uri.get_readonly().to_string()
206 def is_mutable(self):
207 return self._uri.is_mutable()
208 def is_readonly(self):
209 return self._uri.is_readonly()
212 return hash((self.__class__, self._uri))
213 def __cmp__(self, them):
214 if cmp(type(self), type(them)):
215 return cmp(type(self), type(them))
216 if cmp(self.__class__, them.__class__):
217 return cmp(self.__class__, them.__class__)
218 return cmp(self._uri, them._uri)
220 def get_verify_cap(self):
221 return IMutableFileURI(self._uri).get_verify_cap()
223 def get_repair_cap(self):
224 if self._uri.is_readonly():
228 def _do_serialized(self, cb, *args, **kwargs):
229 # note: to avoid deadlock, this callable is *not* allowed to invoke
230 # other serialized methods within this (or any other)
231 # MutableFileNode. The callable should be a bound method of this same
234 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
235 # we need to put off d.callback until this Deferred is finished being
236 # processed. Otherwise the caller's subsequent activities (like,
237 # doing other things with this node) can cause reentrancy problems in
238 # the Deferred code itself
239 self._serializer.addBoth(lambda res: eventually(d.callback, res))
240 # add a log.err just in case something really weird happens, because
241 # self._serializer stays around forever, therefore we won't see the
242 # usual Unhandled Error in Deferred that would give us a hint.
243 self._serializer.addErrback(log.err)
246 #################################
249 def check(self, monitor, verify=False, add_lease=False):
250 checker = self.checker_class(self, monitor)
251 return checker.check(verify, add_lease)
253 def check_and_repair(self, monitor, verify=False, add_lease=False):
254 checker = self.check_and_repairer_class(self, monitor)
255 return checker.check(verify, add_lease)
257 #################################
260 def repair(self, check_results, force=False):
261 assert ICheckResults(check_results)
262 r = Repairer(self, check_results)
267 #################################
270 # allow the use of IDownloadTarget
271 def download(self, target):
272 # fake it. TODO: make this cleaner.
273 d = self.download_best_version()
275 target.open(len(data))
278 return target.finish()
285 def download_best_version(self):
286 return self._do_serialized(self._download_best_version)
287 def _download_best_version(self):
288 servermap = ServerMap()
289 d = self._try_once_to_download_best_version(servermap, MODE_READ)
291 f.trap(NotEnoughSharesError)
292 # the download is worth retrying once. Make sure to use the
293 # old servermap, since it is what remembers the bad shares,
294 # but use MODE_WRITE to make it look for even more shares.
295 # TODO: consider allowing this to retry multiple times.. this
296 # approach will let us tolerate about 8 bad shares, I think.
297 return self._try_once_to_download_best_version(servermap,
299 d.addErrback(_maybe_retry)
301 def _try_once_to_download_best_version(self, servermap, mode):
302 d = self._update_servermap(servermap, mode)
303 d.addCallback(self._once_updated_download_best_version, servermap)
305 def _once_updated_download_best_version(self, ignored, servermap):
306 goal = servermap.best_recoverable_version()
308 raise UnrecoverableFileError("no recoverable versions")
309 return self._try_once_to_download_version(servermap, goal)
311 def get_size_of_best_version(self):
312 d = self.get_servermap(MODE_READ)
313 def _got_servermap(smap):
314 ver = smap.best_recoverable_version()
316 raise UnrecoverableFileError("no recoverable version")
317 return smap.size_of_version(ver)
318 d.addCallback(_got_servermap)
321 def overwrite(self, new_contents):
322 return self._do_serialized(self._overwrite, new_contents)
323 def _overwrite(self, new_contents):
324 servermap = ServerMap()
325 d = self._update_servermap(servermap, mode=MODE_WRITE)
326 d.addCallback(lambda ignored: self._upload(new_contents, servermap))
330 def modify(self, modifier, backoffer=None):
331 """I use a modifier callback to apply a change to the mutable file.
332 I implement the following pseudocode::
334 obtain_mutable_filenode_lock()
337 update_servermap(MODE_WRITE)
338 old = retrieve_best_version()
339 new = modifier(old, servermap, first_time)
344 except UncoordinatedWriteError, e:
348 release_mutable_filenode_lock()
350 The idea is that your modifier function can apply a delta of some
351 sort, and it will be re-run as necessary until it succeeds. The
352 modifier must inspect the old version to see whether its delta has
353 already been applied: if so it should return the contents unmodified.
355 Note that the modifier is required to run synchronously, and must not
356 invoke any methods on this MutableFileNode instance.
358 The backoff-er is a callable that is responsible for inserting a
359 random delay between subsequent attempts, to help competing updates
360 from colliding forever. It is also allowed to give up after a while.
361 The backoffer is given two arguments: this MutableFileNode, and the
362 Failure object that contains the UncoordinatedWriteError. It should
363 return a Deferred that will fire when the next attempt should be
364 made, or return the Failure if the loop should give up. If
365 backoffer=None, a default one is provided which will perform
366 exponential backoff, and give up after 4 tries. Note that the
367 backoffer should not invoke any methods on this MutableFileNode
368 instance, and it needs to be highly conscious of deadlock issues.
370 return self._do_serialized(self._modify, modifier, backoffer)
371 def _modify(self, modifier, backoffer):
372 servermap = ServerMap()
373 if backoffer is None:
374 backoffer = BackoffAgent().delay
375 return self._modify_and_retry(servermap, modifier, backoffer, True)
376 def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
377 d = self._modify_once(servermap, modifier, first_time)
379 f.trap(UncoordinatedWriteError)
380 d2 = defer.maybeDeferred(backoffer, self, f)
381 d2.addCallback(lambda ignored:
382 self._modify_and_retry(servermap, modifier,
387 def _modify_once(self, servermap, modifier, first_time):
388 d = self._update_servermap(servermap, MODE_WRITE)
389 d.addCallback(self._once_updated_download_best_version, servermap)
390 def _apply(old_contents):
391 new_contents = modifier(old_contents, servermap, first_time)
392 if new_contents is None or new_contents == old_contents:
393 # no changes need to be made
396 # However, since Publish is not automatically doing a
397 # recovery when it observes UCWE, we need to do a second
398 # publish. See #551 for details. We'll basically loop until
399 # we managed an uncontested publish.
400 new_contents = old_contents
401 precondition(isinstance(new_contents, str),
402 "Modifier function must return a string or None")
403 return self._upload(new_contents, servermap)
404 d.addCallback(_apply)
407 def get_servermap(self, mode):
408 return self._do_serialized(self._get_servermap, mode)
409 def _get_servermap(self, mode):
410 servermap = ServerMap()
411 return self._update_servermap(servermap, mode)
412 def _update_servermap(self, servermap, mode):
413 u = ServermapUpdater(self, Monitor(), servermap, mode)
414 history = self._client.get_history()
416 history.notify_mapupdate(u.get_status())
419 def download_version(self, servermap, version, fetch_privkey=False):
420 return self._do_serialized(self._try_once_to_download_version,
421 servermap, version, fetch_privkey)
422 def _try_once_to_download_version(self, servermap, version,
423 fetch_privkey=False):
424 r = Retrieve(self, servermap, version, fetch_privkey)
425 history = self._client.get_history()
427 history.notify_retrieve(r.get_status())
430 def upload(self, new_contents, servermap):
431 return self._do_serialized(self._upload, new_contents, servermap)
432 def _upload(self, new_contents, servermap):
433 assert self._pubkey, "update_servermap must be called before publish"
434 p = Publish(self, servermap)
435 history = self._client.get_history()
437 history.notify_publish(p.get_status(), len(new_contents))
438 return p.publish(new_contents)