4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
8 ICheckable, ICheckResults, NotEnoughSharesError
9 from allmydata.util import hashutil, log
10 from allmydata.util.assertutil import precondition
11 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI
12 from allmydata.monitor import Monitor
13 from pycryptopp.cipher.aes import AES
15 from publish import Publish
16 from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
17 ResponseCache, UncoordinatedWriteError
18 from servermap import ServerMap, ServermapUpdater
19 from retrieve import Retrieve
20 from checker import MutableChecker, MutableCheckAndRepairer
21 from repairer import Repairer
25 # these parameters are copied from foolscap.reconnector, which gets them
26 # from twisted.internet.protocol.ReconnectingClientFactory
28 factor = 2.7182818284590451 # (math.e)
29 jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
33 self._delay = self.initialDelay
35 def delay(self, node, f):
39 self._delay = self._delay * self.factor
40 self._delay = random.normalvariate(self._delay,
41 self._delay * self.jitter)
43 reactor.callLater(self._delay, d.callback, None)
46 # use nodemaker.create_mutable_file() to make one of these
48 class MutableFileNode:
49 implements(IMutableFileNode, ICheckable)
51 def __init__(self, storage_broker, secret_holder,
52 default_encoding_parameters, history):
53 self._storage_broker = storage_broker
54 self._secret_holder = secret_holder
55 self._default_encoding_parameters = default_encoding_parameters
56 self._history = history
57 self._pubkey = None # filled in upon first read
58 self._privkey = None # filled in if we're mutable
59 # we keep track of the last encoding parameters that we use. These
60 # are updated upon retrieve, and used by publish. If we publish
61 # without ever reading (i.e. overwrite()), then we use these values.
62 self._required_shares = default_encoding_parameters["k"]
63 self._total_shares = default_encoding_parameters["n"]
64 self._sharemap = {} # known shares, shnum-to-[nodeids]
65 self._cache = ResponseCache()
67 # all users of this MutableFileNode go through the serializer. This
68 # takes advantage of the fact that Deferreds discard the callbacks
69 # that they're done with, so we can keep using the same Deferred
70 # forever without consuming more and more memory.
71 self._serializer = defer.succeed(None)
74 if hasattr(self, '_uri'):
75 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
77 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
79 def init_from_cap(self, filecap):
80 # we have the URI, but we have not yet retrieved the public
81 # verification key, nor things like 'k' or 'N'. If and when someone
82 # wants to get our contents, we'll pull from shares and fill those
84 assert isinstance(filecap, (ReadonlySSKFileURI, WriteableSSKFileURI))
87 if isinstance(filecap, WriteableSSKFileURI):
88 self._writekey = self._uri.writekey
89 self._readkey = self._uri.readkey
90 self._storage_index = self._uri.storage_index
91 self._fingerprint = self._uri.fingerprint
92 # the following values are learned during Retrieval
94 # self._required_shares
96 # and these are needed for Publish. They are filled in by Retrieval
97 # if possible, otherwise by the first peer that Publish talks to.
99 self._encprivkey = None
102 def create_with_keys(self, (pubkey, privkey), contents):
103 """Call this to create a brand-new mutable file. It will create the
104 shares, find homes for them, and upload the initial contents (created
105 with the same rules as IClient.create_mutable_file() ). Returns a
106 Deferred that fires (with the MutableFileNode instance you should
107 use) when it completes.
109 self._pubkey, self._privkey = pubkey, privkey
110 pubkey_s = self._pubkey.serialize()
111 privkey_s = self._privkey.serialize()
112 self._writekey = hashutil.ssk_writekey_hash(privkey_s)
113 self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
114 self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
115 self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
116 self._readkey = self._uri.readkey
117 self._storage_index = self._uri.storage_index
118 initial_contents = self._get_initial_contents(contents)
119 return self._upload(initial_contents, None)
121 def _get_initial_contents(self, contents):
122 if isinstance(contents, str):
126 assert callable(contents), "%s should be callable, not %s" % \
127 (contents, type(contents))
128 return contents(self)
130 def _encrypt_privkey(self, writekey, privkey):
132 crypttext = enc.process(privkey)
135 def _decrypt_privkey(self, enc_privkey):
136 enc = AES(self._writekey)
137 privkey = enc.process(enc_privkey)
140 def _populate_pubkey(self, pubkey):
141 self._pubkey = pubkey
142 def _populate_required_shares(self, required_shares):
143 self._required_shares = required_shares
144 def _populate_total_shares(self, total_shares):
145 self._total_shares = total_shares
147 def _populate_privkey(self, privkey):
148 self._privkey = privkey
149 def _populate_encprivkey(self, encprivkey):
150 self._encprivkey = encprivkey
153 def get_write_enabler(self, peerid):
154 assert len(peerid) == 20
155 return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
156 def get_renewal_secret(self, peerid):
157 assert len(peerid) == 20
158 crs = self._secret_holder.get_renewal_secret()
159 frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
160 return hashutil.bucket_renewal_secret_hash(frs, peerid)
161 def get_cancel_secret(self, peerid):
162 assert len(peerid) == 20
163 ccs = self._secret_holder.get_cancel_secret()
164 fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
165 return hashutil.bucket_cancel_secret_hash(fcs, peerid)
167 def get_writekey(self):
168 return self._writekey
169 def get_readkey(self):
171 def get_storage_index(self):
172 return self._storage_index
173 def get_privkey(self):
175 def get_encprivkey(self):
176 return self._encprivkey
177 def get_pubkey(self):
180 def get_required_shares(self):
181 return self._required_shares
182 def get_total_shares(self):
183 return self._total_shares
185 ####################################
189 return "?" # TODO: this is likely to cause problems, not being an int
193 def get_readcap(self):
194 return self._uri.get_readonly()
195 def get_verify_cap(self):
196 return IMutableFileURI(self._uri).get_verify_cap()
197 def get_repair_cap(self):
198 if self._uri.is_readonly():
203 return self._uri.to_string()
204 def get_readonly_uri(self):
205 return self._uri.get_readonly().to_string()
207 def get_readonly(self):
208 if self.is_readonly():
210 ro = MutableFileNode(self._storage_broker, self._secret_holder,
211 self._default_encoding_parameters, self._history)
212 ro.init_from_cap(self._uri.get_readonly())
215 def is_mutable(self):
216 return self._uri.is_mutable()
217 def is_readonly(self):
218 return self._uri.is_readonly()
221 return hash((self.__class__, self._uri))
222 def __cmp__(self, them):
223 if cmp(type(self), type(them)):
224 return cmp(type(self), type(them))
225 if cmp(self.__class__, them.__class__):
226 return cmp(self.__class__, them.__class__)
227 return cmp(self._uri, them._uri)
229 def _do_serialized(self, cb, *args, **kwargs):
230 # note: to avoid deadlock, this callable is *not* allowed to invoke
231 # other serialized methods within this (or any other)
232 # MutableFileNode. The callable should be a bound method of this same
235 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
236 # we need to put off d.callback until this Deferred is finished being
237 # processed. Otherwise the caller's subsequent activities (like,
238 # doing other things with this node) can cause reentrancy problems in
239 # the Deferred code itself
240 self._serializer.addBoth(lambda res: eventually(d.callback, res))
241 # add a log.err just in case something really weird happens, because
242 # self._serializer stays around forever, therefore we won't see the
243 # usual Unhandled Error in Deferred that would give us a hint.
244 self._serializer.addErrback(log.err)
247 #################################
250 def check(self, monitor, verify=False, add_lease=False):
251 checker = MutableChecker(self, self._storage_broker,
252 self._history, monitor)
253 return checker.check(verify, add_lease)
255 def check_and_repair(self, monitor, verify=False, add_lease=False):
256 checker = MutableCheckAndRepairer(self, self._storage_broker,
257 self._history, monitor)
258 return checker.check(verify, add_lease)
260 #################################
263 def repair(self, check_results, force=False):
264 assert ICheckResults(check_results)
265 r = Repairer(self, check_results)
270 #################################
273 # allow the use of IDownloadTarget
274 def download(self, target):
275 # fake it. TODO: make this cleaner.
276 d = self.download_best_version()
278 target.open(len(data))
281 return target.finish()
288 def download_best_version(self):
289 return self._do_serialized(self._download_best_version)
290 def _download_best_version(self):
291 servermap = ServerMap()
292 d = self._try_once_to_download_best_version(servermap, MODE_READ)
294 f.trap(NotEnoughSharesError)
295 # the download is worth retrying once. Make sure to use the
296 # old servermap, since it is what remembers the bad shares,
297 # but use MODE_WRITE to make it look for even more shares.
298 # TODO: consider allowing this to retry multiple times.. this
299 # approach will let us tolerate about 8 bad shares, I think.
300 return self._try_once_to_download_best_version(servermap,
302 d.addErrback(_maybe_retry)
304 def _try_once_to_download_best_version(self, servermap, mode):
305 d = self._update_servermap(servermap, mode)
306 d.addCallback(self._once_updated_download_best_version, servermap)
308 def _once_updated_download_best_version(self, ignored, servermap):
309 goal = servermap.best_recoverable_version()
311 raise UnrecoverableFileError("no recoverable versions")
312 return self._try_once_to_download_version(servermap, goal)
314 def get_size_of_best_version(self):
315 d = self.get_servermap(MODE_READ)
316 def _got_servermap(smap):
317 ver = smap.best_recoverable_version()
319 raise UnrecoverableFileError("no recoverable version")
320 return smap.size_of_version(ver)
321 d.addCallback(_got_servermap)
324 def overwrite(self, new_contents):
325 return self._do_serialized(self._overwrite, new_contents)
326 def _overwrite(self, new_contents):
327 servermap = ServerMap()
328 d = self._update_servermap(servermap, mode=MODE_WRITE)
329 d.addCallback(lambda ignored: self._upload(new_contents, servermap))
333 def modify(self, modifier, backoffer=None):
334 """I use a modifier callback to apply a change to the mutable file.
335 I implement the following pseudocode::
337 obtain_mutable_filenode_lock()
340 update_servermap(MODE_WRITE)
341 old = retrieve_best_version()
342 new = modifier(old, servermap, first_time)
347 except UncoordinatedWriteError, e:
351 release_mutable_filenode_lock()
353 The idea is that your modifier function can apply a delta of some
354 sort, and it will be re-run as necessary until it succeeds. The
355 modifier must inspect the old version to see whether its delta has
356 already been applied: if so it should return the contents unmodified.
358 Note that the modifier is required to run synchronously, and must not
359 invoke any methods on this MutableFileNode instance.
361 The backoff-er is a callable that is responsible for inserting a
362 random delay between subsequent attempts, to help competing updates
363 from colliding forever. It is also allowed to give up after a while.
364 The backoffer is given two arguments: this MutableFileNode, and the
365 Failure object that contains the UncoordinatedWriteError. It should
366 return a Deferred that will fire when the next attempt should be
367 made, or return the Failure if the loop should give up. If
368 backoffer=None, a default one is provided which will perform
369 exponential backoff, and give up after 4 tries. Note that the
370 backoffer should not invoke any methods on this MutableFileNode
371 instance, and it needs to be highly conscious of deadlock issues.
373 return self._do_serialized(self._modify, modifier, backoffer)
374 def _modify(self, modifier, backoffer):
375 servermap = ServerMap()
376 if backoffer is None:
377 backoffer = BackoffAgent().delay
378 return self._modify_and_retry(servermap, modifier, backoffer, True)
379 def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
380 d = self._modify_once(servermap, modifier, first_time)
382 f.trap(UncoordinatedWriteError)
383 d2 = defer.maybeDeferred(backoffer, self, f)
384 d2.addCallback(lambda ignored:
385 self._modify_and_retry(servermap, modifier,
390 def _modify_once(self, servermap, modifier, first_time):
391 d = self._update_servermap(servermap, MODE_WRITE)
392 d.addCallback(self._once_updated_download_best_version, servermap)
393 def _apply(old_contents):
394 new_contents = modifier(old_contents, servermap, first_time)
395 if new_contents is None or new_contents == old_contents:
396 # no changes need to be made
399 # However, since Publish is not automatically doing a
400 # recovery when it observes UCWE, we need to do a second
401 # publish. See #551 for details. We'll basically loop until
402 # we managed an uncontested publish.
403 new_contents = old_contents
404 precondition(isinstance(new_contents, str),
405 "Modifier function must return a string or None")
406 return self._upload(new_contents, servermap)
407 d.addCallback(_apply)
410 def get_servermap(self, mode):
411 return self._do_serialized(self._get_servermap, mode)
412 def _get_servermap(self, mode):
413 servermap = ServerMap()
414 return self._update_servermap(servermap, mode)
415 def _update_servermap(self, servermap, mode):
416 u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
419 self._history.notify_mapupdate(u.get_status())
422 def download_version(self, servermap, version, fetch_privkey=False):
423 return self._do_serialized(self._try_once_to_download_version,
424 servermap, version, fetch_privkey)
425 def _try_once_to_download_version(self, servermap, version,
426 fetch_privkey=False):
427 r = Retrieve(self, servermap, version, fetch_privkey)
429 self._history.notify_retrieve(r.get_status())
432 def upload(self, new_contents, servermap):
433 return self._do_serialized(self._upload, new_contents, servermap)
434 def _upload(self, new_contents, servermap):
435 assert self._pubkey, "update_servermap must be called before publish"
436 p = Publish(self, self._storage_broker, servermap)
438 self._history.notify_publish(p.get_status(), len(new_contents))
439 return p.publish(new_contents)