4 from zope.interface import implements
5 from twisted.internet import defer, reactor
6 from foolscap.api import eventually
7 from allmydata.interfaces import IMutableFileNode, \
8 ICheckable, ICheckResults, NotEnoughSharesError
9 from allmydata.util import hashutil, log
10 from allmydata.util.assertutil import precondition
11 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI
12 from allmydata.monitor import Monitor
13 from pycryptopp.cipher.aes import AES
15 from publish import Publish
16 from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
17 ResponseCache, UncoordinatedWriteError
18 from servermap import ServerMap, ServermapUpdater
19 from retrieve import Retrieve
20 from checker import MutableChecker, MutableCheckAndRepairer
21 from repairer import Repairer
25 # these parameters are copied from foolscap.reconnector, which gets them
26 # from twisted.internet.protocol.ReconnectingClientFactory
28 factor = 2.7182818284590451 # (math.e)
29 jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
33 self._delay = self.initialDelay
35 def delay(self, node, f):
39 self._delay = self._delay * self.factor
40 self._delay = random.normalvariate(self._delay,
41 self._delay * self.jitter)
43 reactor.callLater(self._delay, d.callback, None)
46 # use nodemaker.create_mutable_file() to make one of these
48 class MutableFileNode:
49 implements(IMutableFileNode, ICheckable)
51 def __init__(self, storage_broker, secret_holder,
52 default_encoding_parameters, history):
53 self._storage_broker = storage_broker
54 self._secret_holder = secret_holder
55 self._default_encoding_parameters = default_encoding_parameters
56 self._history = history
57 self._pubkey = None # filled in upon first read
58 self._privkey = None # filled in if we're mutable
59 # we keep track of the last encoding parameters that we use. These
60 # are updated upon retrieve, and used by publish. If we publish
61 # without ever reading (i.e. overwrite()), then we use these values.
62 self._required_shares = default_encoding_parameters["k"]
63 self._total_shares = default_encoding_parameters["n"]
64 self._sharemap = {} # known shares, shnum-to-[nodeids]
65 self._cache = ResponseCache()
66 self._most_recent_size = None
68 # all users of this MutableFileNode go through the serializer. This
69 # takes advantage of the fact that Deferreds discard the callbacks
70 # that they're done with, so we can keep using the same Deferred
71 # forever without consuming more and more memory.
72 self._serializer = defer.succeed(None)
75 if hasattr(self, '_uri'):
76 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
78 return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
80 def init_from_cap(self, filecap):
81 # we have the URI, but we have not yet retrieved the public
82 # verification key, nor things like 'k' or 'N'. If and when someone
83 # wants to get our contents, we'll pull from shares and fill those
85 assert isinstance(filecap, (ReadonlySSKFileURI, WriteableSSKFileURI))
88 if isinstance(filecap, WriteableSSKFileURI):
89 self._writekey = self._uri.writekey
90 self._readkey = self._uri.readkey
91 self._storage_index = self._uri.storage_index
92 self._fingerprint = self._uri.fingerprint
93 # the following values are learned during Retrieval
95 # self._required_shares
97 # and these are needed for Publish. They are filled in by Retrieval
98 # if possible, otherwise by the first peer that Publish talks to.
100 self._encprivkey = None
103 def create_with_keys(self, (pubkey, privkey), contents):
104 """Call this to create a brand-new mutable file. It will create the
105 shares, find homes for them, and upload the initial contents (created
106 with the same rules as IClient.create_mutable_file() ). Returns a
107 Deferred that fires (with the MutableFileNode instance you should
108 use) when it completes.
110 self._pubkey, self._privkey = pubkey, privkey
111 pubkey_s = self._pubkey.serialize()
112 privkey_s = self._privkey.serialize()
113 self._writekey = hashutil.ssk_writekey_hash(privkey_s)
114 self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
115 self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
116 self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
117 self._readkey = self._uri.readkey
118 self._storage_index = self._uri.storage_index
119 initial_contents = self._get_initial_contents(contents)
120 return self._upload(initial_contents, None)
122 def _get_initial_contents(self, contents):
123 if isinstance(contents, str):
127 assert callable(contents), "%s should be callable, not %s" % \
128 (contents, type(contents))
129 return contents(self)
131 def _encrypt_privkey(self, writekey, privkey):
133 crypttext = enc.process(privkey)
136 def _decrypt_privkey(self, enc_privkey):
137 enc = AES(self._writekey)
138 privkey = enc.process(enc_privkey)
141 def _populate_pubkey(self, pubkey):
142 self._pubkey = pubkey
143 def _populate_required_shares(self, required_shares):
144 self._required_shares = required_shares
145 def _populate_total_shares(self, total_shares):
146 self._total_shares = total_shares
148 def _populate_privkey(self, privkey):
149 self._privkey = privkey
150 def _populate_encprivkey(self, encprivkey):
151 self._encprivkey = encprivkey
152 def _add_to_cache(self, verinfo, shnum, offset, data, timestamp):
153 self._cache.add(verinfo, shnum, offset, data, timestamp)
154 def _read_from_cache(self, verinfo, shnum, offset, length):
155 return self._cache.read(verinfo, shnum, offset, length)
157 def get_write_enabler(self, peerid):
158 assert len(peerid) == 20
159 return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
160 def get_renewal_secret(self, peerid):
161 assert len(peerid) == 20
162 crs = self._secret_holder.get_renewal_secret()
163 frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
164 return hashutil.bucket_renewal_secret_hash(frs, peerid)
165 def get_cancel_secret(self, peerid):
166 assert len(peerid) == 20
167 ccs = self._secret_holder.get_cancel_secret()
168 fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
169 return hashutil.bucket_cancel_secret_hash(fcs, peerid)
171 def get_writekey(self):
172 return self._writekey
173 def get_readkey(self):
175 def get_storage_index(self):
176 return self._storage_index
177 def get_fingerprint(self):
178 return self._fingerprint
179 def get_privkey(self):
181 def get_encprivkey(self):
182 return self._encprivkey
183 def get_pubkey(self):
186 def get_required_shares(self):
187 return self._required_shares
188 def get_total_shares(self):
189 return self._total_shares
191 ####################################
195 return self._most_recent_size
196 def get_current_size(self):
197 d = self.get_size_of_best_version()
198 d.addCallback(self._stash_size)
200 def _stash_size(self, size):
201 self._most_recent_size = size
206 def get_readcap(self):
207 return self._uri.get_readonly()
208 def get_verify_cap(self):
209 return self._uri.get_verify_cap()
210 def get_repair_cap(self):
211 if self._uri.is_readonly():
216 return self._uri.to_string()
218 def get_write_uri(self):
219 if self.is_readonly():
221 return self._uri.to_string()
223 def get_readonly_uri(self):
224 return self._uri.get_readonly().to_string()
226 def get_readonly(self):
227 if self.is_readonly():
229 ro = MutableFileNode(self._storage_broker, self._secret_holder,
230 self._default_encoding_parameters, self._history)
231 ro.init_from_cap(self._uri.get_readonly())
234 def is_mutable(self):
235 return self._uri.is_mutable()
237 def is_readonly(self):
238 return self._uri.is_readonly()
240 def is_unknown(self):
243 def is_allowed_in_immutable_directory(self):
244 return not self._uri.is_mutable()
246 def raise_error(self):
250 return hash((self.__class__, self._uri))
251 def __cmp__(self, them):
252 if cmp(type(self), type(them)):
253 return cmp(type(self), type(them))
254 if cmp(self.__class__, them.__class__):
255 return cmp(self.__class__, them.__class__)
256 return cmp(self._uri, them._uri)
258 def _do_serialized(self, cb, *args, **kwargs):
259 # note: to avoid deadlock, this callable is *not* allowed to invoke
260 # other serialized methods within this (or any other)
261 # MutableFileNode. The callable should be a bound method of this same
264 self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
265 # we need to put off d.callback until this Deferred is finished being
266 # processed. Otherwise the caller's subsequent activities (like,
267 # doing other things with this node) can cause reentrancy problems in
268 # the Deferred code itself
269 self._serializer.addBoth(lambda res: eventually(d.callback, res))
270 # add a log.err just in case something really weird happens, because
271 # self._serializer stays around forever, therefore we won't see the
272 # usual Unhandled Error in Deferred that would give us a hint.
273 self._serializer.addErrback(log.err)
276 #################################
279 def check(self, monitor, verify=False, add_lease=False):
280 checker = MutableChecker(self, self._storage_broker,
281 self._history, monitor)
282 return checker.check(verify, add_lease)
284 def check_and_repair(self, monitor, verify=False, add_lease=False):
285 checker = MutableCheckAndRepairer(self, self._storage_broker,
286 self._history, monitor)
287 return checker.check(verify, add_lease)
289 #################################
292 def repair(self, check_results, force=False):
293 assert ICheckResults(check_results)
294 r = Repairer(self, check_results)
299 #################################
302 def download_best_version(self):
303 return self._do_serialized(self._download_best_version)
304 def _download_best_version(self):
305 servermap = ServerMap()
306 d = self._try_once_to_download_best_version(servermap, MODE_READ)
308 f.trap(NotEnoughSharesError)
309 # the download is worth retrying once. Make sure to use the
310 # old servermap, since it is what remembers the bad shares,
311 # but use MODE_WRITE to make it look for even more shares.
312 # TODO: consider allowing this to retry multiple times.. this
313 # approach will let us tolerate about 8 bad shares, I think.
314 return self._try_once_to_download_best_version(servermap,
316 d.addErrback(_maybe_retry)
318 def _try_once_to_download_best_version(self, servermap, mode):
319 d = self._update_servermap(servermap, mode)
320 d.addCallback(self._once_updated_download_best_version, servermap)
322 def _once_updated_download_best_version(self, ignored, servermap):
323 goal = servermap.best_recoverable_version()
325 raise UnrecoverableFileError("no recoverable versions")
326 return self._try_once_to_download_version(servermap, goal)
328 def get_size_of_best_version(self):
329 d = self.get_servermap(MODE_READ)
330 def _got_servermap(smap):
331 ver = smap.best_recoverable_version()
333 raise UnrecoverableFileError("no recoverable version")
334 return smap.size_of_version(ver)
335 d.addCallback(_got_servermap)
338 def overwrite(self, new_contents):
339 return self._do_serialized(self._overwrite, new_contents)
340 def _overwrite(self, new_contents):
341 servermap = ServerMap()
342 d = self._update_servermap(servermap, mode=MODE_WRITE)
343 d.addCallback(lambda ignored: self._upload(new_contents, servermap))
347 def modify(self, modifier, backoffer=None):
348 """I use a modifier callback to apply a change to the mutable file.
349 I implement the following pseudocode::
351 obtain_mutable_filenode_lock()
354 update_servermap(MODE_WRITE)
355 old = retrieve_best_version()
356 new = modifier(old, servermap, first_time)
361 except UncoordinatedWriteError, e:
365 release_mutable_filenode_lock()
367 The idea is that your modifier function can apply a delta of some
368 sort, and it will be re-run as necessary until it succeeds. The
369 modifier must inspect the old version to see whether its delta has
370 already been applied: if so it should return the contents unmodified.
372 Note that the modifier is required to run synchronously, and must not
373 invoke any methods on this MutableFileNode instance.
375 The backoff-er is a callable that is responsible for inserting a
376 random delay between subsequent attempts, to help competing updates
377 from colliding forever. It is also allowed to give up after a while.
378 The backoffer is given two arguments: this MutableFileNode, and the
379 Failure object that contains the UncoordinatedWriteError. It should
380 return a Deferred that will fire when the next attempt should be
381 made, or return the Failure if the loop should give up. If
382 backoffer=None, a default one is provided which will perform
383 exponential backoff, and give up after 4 tries. Note that the
384 backoffer should not invoke any methods on this MutableFileNode
385 instance, and it needs to be highly conscious of deadlock issues.
387 return self._do_serialized(self._modify, modifier, backoffer)
388 def _modify(self, modifier, backoffer):
389 servermap = ServerMap()
390 if backoffer is None:
391 backoffer = BackoffAgent().delay
392 return self._modify_and_retry(servermap, modifier, backoffer, True)
393 def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
394 d = self._modify_once(servermap, modifier, first_time)
396 f.trap(UncoordinatedWriteError)
397 d2 = defer.maybeDeferred(backoffer, self, f)
398 d2.addCallback(lambda ignored:
399 self._modify_and_retry(servermap, modifier,
404 def _modify_once(self, servermap, modifier, first_time):
405 d = self._update_servermap(servermap, MODE_WRITE)
406 d.addCallback(self._once_updated_download_best_version, servermap)
407 def _apply(old_contents):
408 new_contents = modifier(old_contents, servermap, first_time)
409 if new_contents is None or new_contents == old_contents:
410 # no changes need to be made
413 # However, since Publish is not automatically doing a
414 # recovery when it observes UCWE, we need to do a second
415 # publish. See #551 for details. We'll basically loop until
416 # we managed an uncontested publish.
417 new_contents = old_contents
418 precondition(isinstance(new_contents, str),
419 "Modifier function must return a string or None")
420 return self._upload(new_contents, servermap)
421 d.addCallback(_apply)
424 def get_servermap(self, mode):
425 return self._do_serialized(self._get_servermap, mode)
426 def _get_servermap(self, mode):
427 servermap = ServerMap()
428 return self._update_servermap(servermap, mode)
429 def _update_servermap(self, servermap, mode):
430 u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
433 self._history.notify_mapupdate(u.get_status())
436 def download_version(self, servermap, version, fetch_privkey=False):
437 return self._do_serialized(self._try_once_to_download_version,
438 servermap, version, fetch_privkey)
439 def _try_once_to_download_version(self, servermap, version,
440 fetch_privkey=False):
441 r = Retrieve(self, servermap, version, fetch_privkey)
443 self._history.notify_retrieve(r.get_status())
445 d.addCallback(self._downloaded_version)
447 def _downloaded_version(self, data):
448 self._most_recent_size = len(data)
451 def upload(self, new_contents, servermap):
452 return self._do_serialized(self._upload, new_contents, servermap)
453 def _upload(self, new_contents, servermap):
454 assert self._pubkey, "update_servermap must be called before publish"
455 p = Publish(self, self._storage_broker, servermap)
457 self._history.notify_publish(p.get_status(), len(new_contents))
458 d = p.publish(new_contents)
459 d.addCallback(self._did_upload, len(new_contents))
461 def _did_upload(self, res, size):
462 self._most_recent_size = size